Queue Subscriber without Latch

Options
vshivk99
vshivk99 Member Posts: 13
edited May 2021 in General Discussions #1

Hi,
I am trying to have QueueSubscriber without latch but to have QueueSubscriber continously running. Is there any sample code available? I am trying to create a message SubscribeFromQueue. If I have latch inside subscribeFromQueue it works but once control comes back to main method it does not return back.

 try {
                BytesXMLMessage msg1=subscribeFromQueue(session, QUEUE_NAME);   
                System.out.println("After calling the subscribe");
                BasicReplier(session,msg1, "Our response from BasicReplier");
                System.out.println("After calling the Basic Replier");  

        } catch (Exception e) {
            System.out.println("I was awoken while waiting"+e);
        }


    }

    public BytesXMLMessage subscribeFromQueue(JCSMPSession session, String QueueName)
            throws JCSMPException
    {       
        System.out.println("Inside subscribe");
        final EndpointProperties endpointProps = new EndpointProperties();
        // set queue permissions to "consume" and access-type to "exclusive"
        endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
        endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);  
        final Queue queue = JCSMPFactory.onlyInstance().createQueue(QueueName);
        // Actually provision it, and do not fail if it already exists
        session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
 //       final CountDownLatch latch = new CountDownLatch(1); 

        final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
        flow_prop.setEndpoint(queue);
        flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
        EndpointProperties endpoint_props = new EndpointProperties();
        endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);


        final FlowReceiver cons = session.createFlow(new XMLMessageListener() {
             @Override
            public void onReceive(BytesXMLMessage request) {
                System.out.println("Inside On Recieve");
                msg=request;

                if (msg instanceof TextMessage) {
                    System.out.printf("TextMessage received: '%s'%n", ((TextMessage) msg).getText());
                }
                msg.ackMessage();
        //        latch.countDown();

            }
            public void onException(JCSMPException e) {
                System.out.printf("Consumer received exception: %s%n", e);
            }

        }, flow_prop, endpoint_props);
        cons.start();
         try
         {
             System.out.println("IAfter latch.await()");
     //       latch.await(); // block here until message received, and latch will flip
             System.out.println("IAfter latch.await()");
         }
         catch (Exception e)
         {
             System.out.println("Interrupted Exception");
         }
   //     cons.close();
        return msg;    
    }

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    Options

    Hi @vshivk99,
    @Aaron has actually been working to upgrade some of the samples like the one you're referring to. Check out this GuaranteedSubscriber sample that will show how to consume messages from a queue. If you have any feedback I'm sure @Aaron would love to hear it as well :)

  • vshivk99
    vshivk99 Member Posts: 13
    Options

    Hi Marc and @Aaron, the code works well. This is very helpful.

  • vshivk99
    vshivk99 Member Posts: 13
    Options

    Thanks for sharing the code