I want to subscribe to a Durable Topic endpoint. I am able to subscribe to the direct topic. But my program is not working to when I try to subscribe from DTE.
Program:
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.HOST, host); // host:port
properties.setProperty(JCSMPProperties.USERNAME, username); // client-username
properties.setProperty(JCSMPProperties.VPN_NAME, vpn); // message-vpn
properties.setProperty(JCSMPProperties.PASSWORD, password); // client-password
properties.setBooleanProperty(JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false);
final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
TopicEndpoint topicEndpoint = JCSMPFactory.onlyInstance().createDurableTopicEndpointEx("DTE_Topic");
final CountDownLatch latch = new CountDownLatch(20); // used for synchronizing b/w threads
// Create a Flow be able to bind to and consume messages from the Queue.
final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
flow_prop.setEndpoint(topicEndpoint);
flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
EndpointProperties endpointProvisionProperties = new EndpointProperties();
endpointProvisionProperties.setRespectsMsgTTL(true);
Topic topic = JCSMPFactory.onlyInstance().createTopic("Topic");
final FlowReceiver receiver = session.createFlow(topicEndpoint, topic, new XMLMessageListener() {
@Override
public void onReceive(BytesXMLMessage msg) {
if (msg instanceof TextMessage) {
System.out.printf("TextMessage received1: '%s'%n", ((TextMessage) msg).getText());
} else {
System.out.println("Message received.");
}
System.out.printf("Message Dump:%n%s%n", msg.dump());
msg.ackMessage();
latch.countDown(); // unblock main thread
}
@Override
public void onException(JCSMPException e) {
System.out.printf("Consumer received exception: %s%n", e);
latch.countDown(); // unblock main thread
}
});
session.provision(topicEndpoint, endpointProvisionProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);
//session.addSubscription(topicEndpoint);
session.addSubscription(topicEndpoint, topic, 0);
// Start the consumer
System.out.println("Connected. Awaiting message ...");
receiver.start();
try {
latch.await(); // block here until message received, and latch will flip
} catch (InterruptedException e) {
System.out.println("I was awoken while waiting");
}
// Close consumer
receiver.close();
System.out.println("Exiting.");
session.closeSession();
Exception:
Exception in thread “main” java.lang.IllegalArgumentException: Endpoint must be a Queue or ClientName
at com.solacesystems.jcsmp.impl.JCSMPBasicSession.addSubscription(JCSMPBasicSession.java:899)
at com.myak.solace.SubscriberTopic.main(SubscriberTopic.java:84)