🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
Consume from Durable Topic Endpoint using JCSMP
I want to subscribe to a Durable Topic endpoint. I am able to subscribe to the direct topic. But my program is not working to when I try to subscribe from DTE.
Program:
final JCSMPProperties properties = new JCSMPProperties(); properties.setProperty(JCSMPProperties.HOST, host); // host:port properties.setProperty(JCSMPProperties.USERNAME, username); // client-username properties.setProperty(JCSMPProperties.VPN_NAME, vpn); // message-vpn properties.setProperty(JCSMPProperties.PASSWORD, password); // client-password properties.setBooleanProperty(JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false); final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties); session.connect(); TopicEndpoint topicEndpoint = JCSMPFactory.onlyInstance().createDurableTopicEndpointEx("DTE_Topic"); final CountDownLatch latch = new CountDownLatch(20); // used for synchronizing b/w threads // Create a Flow be able to bind to and consume messages from the Queue. final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties(); flow_prop.setEndpoint(topicEndpoint); flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT); EndpointProperties endpointProvisionProperties = new EndpointProperties(); endpointProvisionProperties.setRespectsMsgTTL(true); Topic topic = JCSMPFactory.onlyInstance().createTopic("Topic"); final FlowReceiver receiver = session.createFlow(topicEndpoint, topic, new XMLMessageListener() { @Override public void onReceive(BytesXMLMessage msg) { if (msg instanceof TextMessage) { System.out.printf("TextMessage received1: '%s'%n", ((TextMessage) msg).getText()); } else { System.out.println("Message received."); } System.out.printf("Message Dump:%n%s%n", msg.dump()); msg.ackMessage(); latch.countDown(); // unblock main thread } @Override public void onException(JCSMPException e) { System.out.printf("Consumer received exception: %s%n", e); latch.countDown(); // unblock main thread } }); session.provision(topicEndpoint, endpointProvisionProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS); //session.addSubscription(topicEndpoint); session.addSubscription(topicEndpoint, topic, 0); // Start the consumer System.out.println("Connected. Awaiting message ..."); receiver.start(); try { latch.await(); // block here until message received, and latch will flip } catch (InterruptedException e) { System.out.println("I was awoken while waiting"); } // Close consumer receiver.close(); System.out.println("Exiting."); session.closeSession();
Exception:
Exception in thread "main" java.lang.IllegalArgumentException: Endpoint must be a Queue or ClientName
at com.solacesystems.jcsmp.impl.JCSMPBasicSession.addSubscription(JCSMPBasicSession.java:899)
at com.myak.solace.SubscriberTopic.main(SubscriberTopic.java:84)
Comments
-
Hi @nirodha23 , You don't need explicit addSubscription() for DTE. This is required only if you are using Queue endpoint, or you want to add subscription to the client directly.
Here is a sample DTE subscriber implementation.
https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/features/SimpleFlowToTopic.javaThanks
// nram0