Hi,
I am trying to have QueueSubscriber without latch but to have QueueSubscriber continously running. Is there any sample code available? I am trying to create a message SubscribeFromQueue. If I have latch inside subscribeFromQueue it works but once control comes back to main method it does not return back.
try {
BytesXMLMessage msg1=subscribeFromQueue(session, QUEUE_NAME);
System.out.println("After calling the subscribe");
BasicReplier(session,msg1, "Our response from BasicReplier");
System.out.println("After calling the Basic Replier");
} catch (Exception e) {
System.out.println("I was awoken while waiting"+e);
}
}
public BytesXMLMessage subscribeFromQueue(JCSMPSession session, String QueueName)
throws JCSMPException
{
System.out.println("Inside subscribe");
final EndpointProperties endpointProps = new EndpointProperties();
// set queue permissions to "consume" and access-type to "exclusive"
endpointProps.setPermission(EndpointProperties.PERMISSION_CONSUME);
endpointProps.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
final Queue queue = JCSMPFactory.onlyInstance().createQueue(QueueName);
// Actually provision it, and do not fail if it already exists
session.provision(queue, endpointProps, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
// final CountDownLatch latch = new CountDownLatch(1);
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(queue);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpoint_props = new EndpointProperties();
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
final FlowReceiver cons = session.createFlow(new XMLMessageListener() {
@Override
public void onReceive(BytesXMLMessage request) {
System.out.println("Inside On Recieve");
msg=request;
if (msg instanceof TextMessage) {
System.out.printf("TextMessage received: '%s'%n", ((TextMessage) msg).getText());
}
msg.ackMessage();
// latch.countDown();
}
public void onException(JCSMPException e) {
System.out.printf("Consumer received exception: %s%n", e);
}
}, flow_prop, endpoint_props);
cons.start();
try
{
System.out.println("IAfter latch.await()");
// latch.await(); // block here until message received, and latch will flip
System.out.println("IAfter latch.await()");
}
catch (Exception e)
{
System.out.println("Interrupted Exception");
}
// cons.close();
return msg;
}