subscribe to multiple topics inside same session?

Options
alinaqvi
alinaqvi Member Posts: 35
edited October 2019 in PubSub+ Event Broker #1

TopicSubscriber.java (https://github.com/SolaceSamples/solace-samples-jms/blob/master/src/main/java/com/solace/samples/TopicSubscriber.java) gives a good intro to subscribing to a single topic.

What if we want to subscribe to multiple topics ideally with a listener attached to each of those subscriptions. Do we have example code for that some where please?

Thanks,

Best Answer

Answers

  • [Deleted User]
    [Deleted User] Posts: 0 ✭✭
    Options

    What I am getting from support around this @alinaqvi , is that you would create a second consumer in the same way that we show how the first one is created.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 923 admin
    Options

    There are a few options here.
    1. As @jeremy mentioned, from one session you can create multiple MessageConsumers that each consume from their own endpoint.
    2. You can use wildcards to subscribe to multiple topics, e.g: foo/*/bar or foo/>
    3. Last, and maybe your best option if wildcarding can't give you what you want is to leverage Solace's topic to queue mapping capability. Essentially you would create a queue that attracts messages from a bunch of different topics and then in your code you can just listen to that single queue. See more on that here: https://solace.com/samples/solace-samples-jms/topic-to-queue-mapping/

  • alinaqvi
    alinaqvi Member Posts: 35
    edited October 2019 #5
    Options

    The documentation for session.getMessageConsumer() says:

    "
    NOTE: For a given JCSMPSession, this method returns a new XMLMessageConsumer object on each call, closing the existing consumer if it exists.
    "
    So basically what happens is that at the last consumer that you start will be the active one. This will receive msgs for all registered topics so far. Right now I have a privately held list of listeners that I map to the topic name/s. When I receive a new message from solace I check what topic it was sent on and then call the appropriate listener/s.

    What I would ideally like to do is use solace API similar to pseudo-code below:

    session.registerForTopicUpdates("abc/topic1" , XMLMsgListener1 )
    session.registerForTopicUpdates("abc/topic2" , XMLMsgListener2 )
    ....

  • amackenzie
    amackenzie Member, Employee Posts: 260 Solace Employee
    Options

    @alinaqvi the feature you are looking for is Topic Dispatch. This allows you to set up multiple listeners with topic filters that will dispatch messages to listeners with matching filters.
    Unfortunately, topic dispatch is not available in JCSMP at this time.
    If this functionality is important to you, I suggest logging a support ticket for this as a Feature Request.

  • alinaqvi
    alinaqvi Member Posts: 35
    Options

    Thanks for your responses.
    Is JCSMP the recommended API for java clients?

  • alinaqvi
    alinaqvi Member Posts: 35
    Options

    Also what I am trying to do here ... would it be possible through the spring integration API ?
    Apologies for being pendantic but it seems like a fundamental use case, can you please double check with your dev team.
    Thanks,

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 923 admin
    Options

    Hi,
    Yes! Using our integration with Spring would be a great way to do this.
    Using our Spring Boot JMS Starter & the @JmsListener java annotation would likely be the easiest way to get what you want.
    You can do something like seen in the code below.

    package com.solace.samples.spring.boot;
    
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.TextMessage;
    
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.jms.annotation.JmsListener;
    
    @SpringBootApplication
    public class SpringBootReceiver {
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBootReceiver.class, args);
        }
    
        @JmsListener(destination = "abc/topic1")
        public void handleFoo(Message message) {
    
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                try {
                    System.out.println("TextMessage Received on destination "+ tm.getJMSDestination());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("Message of another type was received");
            }
        }
    
    
        @JmsListener(destination = "abc/topic2")
        public void handleBar(Message message) {
    
            if (message instanceof TextMessage) {
                TextMessage tm = (TextMessage) message;
                try {
                    System.out.println("TextMessage Received on destination " + tm.getJMSDestination());
                } catch (JMSException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            } else {
                System.out.println("Message of another type was received");
            }
        }
    }
    

    Note that by default when using the @JmsListener annotation Spring Boot autoconfigures objects that are set to listen from a queue so if you want to subscribe to a topic you'll need to set this property in your application.properties file (or equivalent).

    spring.jms.pub-sub-domain=true
    

    Hope that helps!

  • alinaqvi
    alinaqvi Member Posts: 35
    Options

    Can you please share the properties for kerberos authentication as well ?
    Also we are using solcache and queues as well in our app flow. Would the SB jms work with those as well ?

  • alinaqvi
    alinaqvi Member Posts: 35
    Options

    This is how I initialized the javax.jms.ConnectionFactory with kerberos auth incase it helps some one else:

    @Bean
    public ConnectionFactory connectionFactory() throws Exception {

        Hashtable<String, Object> env = new Hashtable<>();
        env.put(InitialContext.PROVIDER_URL, jndiProviderURL);
        env.put(Context.SECURITY_PRINCIPAL, username);
        env.put(Context.SECURITY_CREDENTIALS, password); // its a blank string.
    
        System.setProperty("java.security.krb5.conf", KRB5_LOC);
        System.setProperty("java.security.auth.login.config", JAAS_LOC);
    
        env.put(SupportedProperty.SOLACE_JMS_AUTHENTICATION_SCHEME, SupportedProperty.AUTHENTICATION_SCHEME_GSS_KRB);
        env.put(SupportedProperty.SOLACE_JMS_KRB_SERVICE_NAME, "HOST");
    
    
        env.put(SupportedProperty.SOLACE_JMS_VPN, vpn);
        env.put(SupportedProperty.SOLACE_JMS_DYNAMIC_DURABLES, false);
    
        env.put(SupportedProperty.SOLACE_JMS_DELIVER_TO_ONE_OVERRIDE , false);
        SolConnectionFactory cf = SolJmsUtility.createConnectionFactory(env );
    
        return cf;
    
    }
    
  • alinaqvi
    alinaqvi Member Posts: 35
    Options

    Apologies for reviving this old thread but the question is related...

    Is there a unique subscription id generated when we do a subscribe request to a topic (similar to a cache request id) ?

    Topic solaceTopic = JCSMPFactory.onlyInstance().createTopic("abc/topic");
    session.addSubscription(solaceTopic);
    // If I can get or add a unique id for this subscription which would be part of the received messages then I can do a performant routing of messages internally in our app.

    Many Thanks,
    Ali

  • bxb
    bxb Member Posts: 5
    Options

    I have configured "spring.jms.pub-sub-domain=true" and tried this way , but the program always reports an error of : unknown endpoint (503: Unknown Queue).

    it's wired.😅

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 923 admin
    Options

    Hi @bxb,

    I don't know what the issue is off the top of my head. Maybe connecting to the wrong message vpn if you have multiple? This thread is pretty old so only people who commented on it will get notified, can you create a new thread and share more of your code/config? By creating a new discussion more people will see it.

    thank you!