Try PubSub+
If you haven't already, check out our new Developer Portal! You'll find useful information about Solace PubSub+ as well as handy resources to get you started.

Architecting multi queue / multi topic subscriber component

alinaqvialinaqvi Member Posts: 17

All,
Need your input around what solace team advises in terms of best practices.

Our Architecture:
We have a java app/service that runs independently. It subscribes to multiple topics(both solcache and normal solace topics) and multiple queues. The multiple topics and queues are the messages published by other services/apps.

As per this thread https://solace.community/discussion/77/subscribe-to-multiple-topics-inside-same-session , The topic dispatcher feature isnt available in JCSMP library atm. This limitation would apply to multi queue subscribe as well, as a new consumer is created and previous one closed by the FlowReceiver too.

To get around this should we:

1) Write our own TopicDispatch functionality based on bytesXMLMessage.getDestination().getName() ? Incase of queues there is an added complexity in that this method returns the name of the topic( mapped to the queue we are subscribing on) that this msg was published on rather than the queue name.

2) Create a seperate JCSMP session(TCP Connection) for each topic and queue that we subscribe to ? We are looking at about a dozen connections for each of our apps. So about 150(max) tcp connections to our solace VPN which atm allows a max of 500 connections.

3) Get all our topics and queues mapped to a single/fewer queues and then do a message dispatch based on the value set in bytesXMLMessage.getApplicationMessageType() ?

4) Any other more elegant ways to tackle this ?

Many Thanks,
Ali

Answers

  • kovkov Member, Employee Posts: 3

    Hi Ali, that's quite a bundle of stuff to manage, I think it'll be necessary to break it down into smaller chunks to work through. A couple things to clarify upfront:

    • If you have multiple queues, each queue would get it's own msg-handler function per flow, so you would effectively get per-Queue dispatching if that aligns to your patterns
    • TopicDispatch is not supported in the native Java JCSMP library, but it is supported in the Java-RTO (RealTime Optimized) library; not suggesting that for general-compute use cases, it is designed for very low-latency, high-performance apps and supports a Zero-GC model for those that want that
    • I wouldn't normally recommend a separate session per msg-handler as a rule of thumb but if you don't have a lot of connections and are not risking hitting your connection-limits, it would be a benign way to achieve what you are looking for.

    May I ask what is your main motivation for wanting different message-handlers per destination? Is it because of the data types of the messages arriving on those topics? Or is it something about controlling the handler context for each topic or flow? Depending on those reasons, my recommendations would vary.

    Cheers,
    kov

  • alinaqvialinaqvi Member Posts: 17

    Hey Kov, Thanks for the detailed reply.
    So as per your reply I think I misunderstood the documentation for FlowReceiver and it seems like it creates a new receiver without closing the previous one so multi handlers for multi queues should work. Although I havent tested it yet. Should we try using FlowReciever for topic and solcache-topic subscriptions as well ?

    Our current code for queue subscriptions:

        Queue queue = JCSMPFactory.onlyInstance().createQueue(queueName);
    
        ConsumerFlowProperties flowProperties = new ConsumerFlowProperties().setEndpoint(queue);
    
        flowProperties.setAckMode(JCSMPProperties.SUPPORTED_MESSAGE_ACK_CLIENT);
    
        FlowReceiver receiver = null;
        flowProperties.setActiveFlowIndication(true);
    
        try {
    
            receiver = session.createFlow(myXMLMsgListener, flowProperties, null,
                    (Object source, FlowEventArgs event) -> {
    
                        if (event.getEvent() == FlowEvent.FLOW_ACTIVE) {
    
                            Log.info(LOG, "Queue = %s Subscribed successfully. ", queueName);
    
    
    
                        } else if (event.getEvent() == FlowEvent.FLOW_INACTIVE) {
    
                            Log.info(LOG, "Queue = %s Unsubscribed ... this is probably due to a network disconnect. ", queueName);
    
                        }
    
                    });
    
            receiver.start();
    
  • alinaqvialinaqvi Member Posts: 17

    In a nutshell we are trying to do a generic interface for interacting with JCSMP from our Apps.

    /**
     * Represents one tcp session/connection to the solace servers.
     */
    public interface SolaceSessionAPI {
    
        /**
         * Subscribes to the specified solace topic. The message updates would be received by the specified listener. 
         * @param topic
         * @param listener
         */
        void subscribeSolTopic(String topic, CustomXMLMessageListener listener);
    
        /**
         * Subscribes to the specified solace queue. The message updates would be received by the specified listener. 
         * @param topic
         * @param listener
         */
        void subscribeSolQueue(String queueName, CustomXMLMessageListener listener);
    
    
    
        /**
         * Subscribe to the specified solace cache topic. The message updates would be received by the specified listener. cacheEventListener gives us cache update events such as onComplete and onException. 
         * @param topic
         * @param listener 
         */
        void subscribeSolCacheTopic(String topic, CustomXMLMessageListener listener, MyCacheRequestListener cacheEventListener);
    
    
     }
    
  • alinaqvialinaqvi Member Posts: 17

    @kov said:

    May I ask what is your main motivation for wanting different message-handlers per destination? Is it because of the data types of the messages arriving on those topics? Or is it something about controlling the handler context for each topic or flow? Depending on those reasons, my recommendations would vary.

    Cheers,
    kov

    So lets say I am building component1(java app)
    component1 receives OrderCommands from gui( topic/for/guicommands ) and ExchangeGW(topic/for/exchg_gw_commands).
    gui can send us a new order-request and gw can send us an order-fill-response. Both of these are of type OrderCommands but I need to know if the msg came from gui or exchg_gw to raise an error incase its of incorrect type.

    We also have price updates coming into component1. These are of completely different type so I need to route them to correct deserializer etc.

    So there umpteen scenarios all over our app suite where we need to know which msg came from which topic.

    Thanks,

  • kovkov Member, Employee Posts: 3

    Hi Ali, thanks for the additional context. I thought I'd responded but our community board software seems to have eaten it, so let's try again and my apologies for the delay:

    • FLOWS: FlowReceivers are a PERSISTENT messaging concept; they only apply to persistent endpoints that save messages for you until you consume and acknowledge them

    • CACHES: Cache requests are a DIRECT messaging concept; they do not work with persistence so cache results cannot come through your queue consumers

    I don't think topic-dispatch would have helped you to unify those two flows because of this.

    For type-dispatching, ideally you would have publishers pub a unique schemaID on published messages that consumers could use to dispatch to proper deserializers, etc. The advantage to this is that it would work in any handler, and you could even factor out that dispatch capability into its own component if you wanted.

    This could be in a user-defined property. See TomF's blog post about using header properties. Or there is a 36-byte binary msg-field called UserData that you can access via getters/setters. It could even be the first few bytes of your payload, although the disadvantage of this is that you always need to extract the payload to find the schema type, even if your app would discard messages of schemas it didn't want.

    The question for you is -- can you mandate publishers put a schemaID on their messages identifying the payloads? Or is this outside of your control?

  • alinaqvialinaqvi Member Posts: 17

    Hi Kov,
    I will try to go comment by comment below.

    Thanks for this > @kov said:

    Hi Ali, thanks for the additional context. I thought I'd responded but our community board software seems to have eaten it, so let's try again and my apologies for the delay:

    No probs, one of my posts some how disappeared as well. There is some issue in the forum setup I guess.

    • FLOWS: FlowReceivers are a PERSISTENT messaging concept; they only apply to persistent endpoints that save messages for you until you consume and acknowledge them

    • CACHES: Cache requests are a DIRECT messaging concept; they do not work with persistence so cache results cannot come through your queue consumers

    I don't think topic-dispatch would have helped you to unify those two flows because of this.

    We dont want to unify the flows. If you look at the SolaceSessionAPI interface example that I pasted above we are fine with having seperate methods for topic, queue and sol-cache topics. We just ideally want to get it done with min no of tcp connections/sessions.

    The question for you is -- can you mandate publishers put a schemaID on their messages identifying the payloads? Or is this outside of your control?

    Changing all the message publishers will probably not be possible for us. I think the best solution for us is to have a custom topic dispatcher for topics and sol-cache topics. Maybe we can experimenting with using Spring reactor or RxJava etc or it might be an overkill ... not sure yet.

    Thanks,
    Ali

Sign In or Register to comment.