Consume from Durable Topic Endpoint using JCSMP

Options
nirodha23
nirodha23 Member Posts: 2
edited September 2021 in Connectors & Integrations #1

I want to subscribe to a Durable Topic endpoint. I am able to subscribe to the direct topic. But my program is not working to when I try to subscribe from DTE.

Program:

    final JCSMPProperties properties = new JCSMPProperties();
    properties.setProperty(JCSMPProperties.HOST, host);     // host:port
    properties.setProperty(JCSMPProperties.USERNAME, username); // client-username
    properties.setProperty(JCSMPProperties.VPN_NAME,  vpn); // message-vpn
    properties.setProperty(JCSMPProperties.PASSWORD, password); // client-password
    properties.setBooleanProperty(JCSMPProperties.SSL_VALIDATE_CERTIFICATE, false);

    final JCSMPSession session = JCSMPFactory.onlyInstance().createSession(properties);
    session.connect();

    TopicEndpoint topicEndpoint = JCSMPFactory.onlyInstance().createDurableTopicEndpointEx("DTE_Topic");
    final CountDownLatch latch = new CountDownLatch(20); // used for synchronizing b/w threads

    // Create a Flow be able to bind to and consume messages from the Queue.
    final ConsumerFlowProperties flow_prop = new ConsumerFlowProperties();
    flow_prop.setEndpoint(topicEndpoint);
    flow_prop.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);

    EndpointProperties endpointProvisionProperties = new EndpointProperties();
    endpointProvisionProperties.setRespectsMsgTTL(true);
    Topic topic = JCSMPFactory.onlyInstance().createTopic("Topic");
    final FlowReceiver receiver = session.createFlow(topicEndpoint, topic, new XMLMessageListener() {
        @Override
        public void onReceive(BytesXMLMessage msg) {
            if (msg instanceof TextMessage) {
                System.out.printf("TextMessage received1: '%s'%n", ((TextMessage) msg).getText());
            } else {
                System.out.println("Message received.");
            }
            System.out.printf("Message Dump:%n%s%n", msg.dump());

            msg.ackMessage();
            latch.countDown(); // unblock main thread
        }

        @Override
        public void onException(JCSMPException e) {
            System.out.printf("Consumer received exception: %s%n", e);
            latch.countDown(); // unblock main thread
        }
    });


    session.provision(topicEndpoint, endpointProvisionProperties, JCSMPSession.FLAG_IGNORE_ALREADY_EXISTS);


    //session.addSubscription(topicEndpoint);
    session.addSubscription(topicEndpoint, topic, 0);
    // Start the consumer
    System.out.println("Connected. Awaiting message ...");
    receiver.start();

    try {
        latch.await(); // block here until message received, and latch will flip
    } catch (InterruptedException e) {
        System.out.println("I was awoken while waiting");
    }
    // Close consumer
    receiver.close();
    System.out.println("Exiting.");
    session.closeSession();

Exception:
Exception in thread "main" java.lang.IllegalArgumentException: Endpoint must be a Queue or ClientName
at com.solacesystems.jcsmp.impl.JCSMPBasicSession.addSubscription(JCSMPBasicSession.java:899)
at com.myak.solace.SubscriberTopic.main(SubscriberTopic.java:84)

Comments