Persistent receiver with subscription-specific listeners

mystarrocks
mystarrocks Member Posts: 11
edited February 2022 in PubSub+ Event Broker #1

Hi,

Is it possible to have multiple topic-specific receivers and handlers to consume messages from a queue? Like:

var receiver1 = messagingService.createPersistentMessageReceiverBuilder()
        .withMessageAutoAcknowledgement()
        .withSubscriptions(topicSet1)
        .build(queue)
        .start();
receiver1.receiveAsync((message) -> {
    //... handle messages received from queue thru topic subscriptions in topicSet1
});

var receiver2 = messagingService.createPersistentMessageReceiverBuilder()
        .withMessageAutoAcknowledgement()
        .withSubscriptions(topicSet2)
        .build(queue)
        .start();
receiver2.receiveAsync((message) -> {
    //... handle messages received from queue thru topic subscriptions in topicSet2
});

I could, of course, have a common receiver and handler, then delegate messages based on the topic strings (if it's available as a property) or payload data, or achieve the same thru selectors using "withMessageSelector", but I was wondering if it's achievable thru topics.

On a related note, is "withSubsriptions" the programmatic equivalent of adding topic subscriptions to a queue in the event broker administratively? That is, once a subscription has been added to a queue programmatically or administratively, will subsequent calls with "withSubscriptions" for the same subscriptions fail due to topic subscriptions already added to the queue? If so, when and how can this approach be preferred to adding/removing them directly in the event broker?

Also, how will this work if a message matches multiple topic subscriptions (or selectors, if that's how we achieve topic-relevant receivers/handlers) and so multiple receivers/handlers per my design — will it be received and handled in round-robin fashion by the listeners as long as the queue is non-exclusive?

Tagged:

Best Answer

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 665 admin
    #2 Answer ✓

    Hey there @mystarrocks..! Thanks for that very detailed and well-thought-out question. I have to admit, I'm not 100% sure on multiple dispatches-per-queue. Short answer: I'm pretty sure no. If you really want to have multiple topic subscriptions added to a single queue, and have those get different callbacks, I think you have to roll your own. I'll go double-check anyhow.

    But I wanted to respond to some of your other points/comments as I think they may be important? Maybe you know all this already..!?

    Topics and Selectors in Solace are very different things, in terms of how routing is achieved and performance and such. Topics are like an IP packet dest header, and subscriptions are like a subnet mask; Selectors are more like looking inside the packet for user data or payload info to help do filtering. (rough analogy). Topics matching is MUCH faster in the broker than Selectors, so if possible to publish your messages with descriptive, multi-level topics that don't need Selectors, that's good.

    Why not have two queues, each with their own subscription?

    Having the (messaging) API add subscriptions to a queue is a little bit different than administratively adding it. Biggest diff is permissions: a client app needs to either be the "owner" of a queue, or the queue's permissions must be set to at least "modfy topics". Often, in production, broker administrators will lock down client permissions (so a rogue app can't go around creating 100s of random queues), and pre-configure all queues with their required topics beforehand, and then apps just connect/bind to queues (queue permission == consume) and apps just get what's in them. Generally considered safer than letting apps create queues and/or modify topic subscriptions on queues at their fancy.

    In terms of overlapping topic subscriptions (or selectors) on the same endpoint, you'll only get one message. If you have two queues, bound to each, with overlapping subscriptions, then it's possible to get double messages.

    Hope that helps! I'm assuming you are already playing with the Java API? What have been your observations so far?

Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 665 admin
    #3 Answer ✓

    Hey there @mystarrocks..! Thanks for that very detailed and well-thought-out question. I have to admit, I'm not 100% sure on multiple dispatches-per-queue. Short answer: I'm pretty sure no. If you really want to have multiple topic subscriptions added to a single queue, and have those get different callbacks, I think you have to roll your own. I'll go double-check anyhow.

    But I wanted to respond to some of your other points/comments as I think they may be important? Maybe you know all this already..!?

    Topics and Selectors in Solace are very different things, in terms of how routing is achieved and performance and such. Topics are like an IP packet dest header, and subscriptions are like a subnet mask; Selectors are more like looking inside the packet for user data or payload info to help do filtering. (rough analogy). Topics matching is MUCH faster in the broker than Selectors, so if possible to publish your messages with descriptive, multi-level topics that don't need Selectors, that's good.

    Why not have two queues, each with their own subscription?

    Having the (messaging) API add subscriptions to a queue is a little bit different than administratively adding it. Biggest diff is permissions: a client app needs to either be the "owner" of a queue, or the queue's permissions must be set to at least "modfy topics". Often, in production, broker administrators will lock down client permissions (so a rogue app can't go around creating 100s of random queues), and pre-configure all queues with their required topics beforehand, and then apps just connect/bind to queues (queue permission == consume) and apps just get what's in them. Generally considered safer than letting apps create queues and/or modify topic subscriptions on queues at their fancy.

    In terms of overlapping topic subscriptions (or selectors) on the same endpoint, you'll only get one message. If you have two queues, bound to each, with overlapping subscriptions, then it's possible to get double messages.

    Hope that helps! I'm assuming you are already playing with the Java API? What have been your observations so far?

  • mystarrocks
    mystarrocks Member Posts: 11
    edited February 2022 #4

    Hey @Aaron, thank you taking the time to review and respond. Love your video tutorials on YouTube!

    Regarding multiple dispatches-per-queue — I figured as much. The goal again, is to avoid central handlers with if-else style delegations to other handlers in the code if possible, but I guess I could achieve that with separate queues, one per subscription.

    In my tests, I was able to have multiple consumers connected to a queue (with at most one active flow at any given moment, of course) where, as long as the my listeners had the right selector with no overlap, I was able to route messages — based on the selector value — to the right listener within a consumer application. Since the topic itself is a property on the message, I was wondering if I could leverage that for the internal routing within a consumer, but apparently not.

    As for topic v. selectors, yes, I am aware of the major differences; I personally prefer the handbag analogy. 😀 If the message were a handbag, then the topic would be a sticker on the handbag for easy identification, while the message properties would be items in the front pocket, and the payload itself would be items in the main storage area. Naturally, it should be much easier to route bags by looking at the labels than checking the front pocket for some specific information. And, you are probably right: for Solace to route messages easily and effectively to the destinations, topics may be a better choice than selectors, ingress or egress. But once a message has been routed to a destination and persisted, any smart internal routing on a consumer — based on my tests — involved using selectors, so I intend to have both hierarchical topics (EDA FTW!) on the messages when publishing as well as have selectors on the consumers, although multiple queues approach sounds like an even better solution.

    Good point about the permissions; the clients are likely going to be restricted from adding/removing subscriptions, so I don't see withSubscriptions being used that much when setting up receivers thru the pubsubplus Java API.

    I definitely like the Java API so far. Previously, I used Spring and JCSMP libraries, but the pubsubplus API is more intuitive and offers a consistent usage experience across languages, so I am starting to use this one more.

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 543 admin
    edited February 2022 #5

    Hey @mystarrocks - thanks for the question! You are correct on the following

    "withSubsriptions" is the programmatic equivalent of adding topic subscriptions to a queue in the event broker administratively

    As long as the queue is configured to have the "non-owner permission" property to modify topics as per Aaron's comment and the screenshot below

    To answer your question regarding the different queues I have put a sample code in python to check it out. Note that since the behaviour of the PubSub+ Messaging API for Java and Python is similar, you will get the same behaviour with the new Java API as well

       # Build a receiver1 and bind it to the queue_name
        persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
            .with_subscriptions([TopicSubscription.of("topic1/>")])\
            .build(queue)
        persistent_receiver.start()
    
        # Callback for received messages on receiver 1
        persistent_receiver.receive_async(MessageHandlerImpl())
    
        # Build a receiver2 and bind it to the same queue_name
        persistent_receiver2: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
            .with_subscriptions([TopicSubscription.of("topic2/>")])\
            .build(queue)
        persistent_receiver2.start()
    
        # Callback for received messages on receiver 2
        persistent_receiver2.receive_async(MessageHandlerImpl2())
    

    and I have defined queue in three different ways:

    1. queue = Queue.durable_exclusive_queue("durable-exclusive-queue"): Since the queue is a durable exclusive queue, when I publish a message on either topic1/test/etc or topic2/test/etc only receiver 1 will receive the message.
    2. queue = Queue.durable_non_exclusive_queue("durable-non-exclusive-queue"): messages will be received by the two receivers in a round robin behaviour
    3. queue = Queue.non_durable_exclusive_queue("non-durable-queue"): This will in fact result in an error! Take a guess why and what the exception is. spoiler below 👇

    The exception raised is

    {'caller_description': 'flow topic add sub ', 'return_code': 'Fail', 'sub_code': 'SOLCLIENT_SUBCODE_MAX_CLIENTS_FOR_QUEUE', 'error_info_sub_code': 63, 'error_info_contents': 'Max clients exceeded for queue'}

    and reason being is you can only have one non durable queue per client

    Note: Assuming the queues already exist on the broker.

    Hope this explanation helps!

    And P.S, I reallly like your handbag analogy 😅

  • mystarrocks
    mystarrocks Member Posts: 11

    Hi @Tamimi, thank you — your explanation definitely helps!

    In my case as well, I expect the topic subscriptions to be configured on the queues administratively ahead of time, so withSubscriptions at runtime is not really an option if it's going to try adding those subscriptions as well. Besides, the non-owner permission on our queue is going to be set to Consume, so clients modifying topics programmatically is out of question. I was only exploring that as I [wrongly] interpreted withSubscriptions as a way to internally match the messages to receivers based on topic subscriptions. I ran a test similar to yours and realized that's not the case.

    In your example, the queue definition #2 (a durable non-exclusive queue) is what I would be going for, but even with that, there are no guarantees that:

    • persistent_receiver will receive only messages matching its subscription: topic1/>
    • persistent_receiver2 will receive only messages matching its subscription: topic2/>

    I was hoping it would work that way so I could have topic-bound receivers, but that's because I'd wrongly assumed how withSubscriptions would work — any receiver calling withSubscriptions simply registers specified topic subscriptions (if they are permitted to and the subscriptions don't already exist) on the queue, that's all. My test proved this as well, so I was wondering if there's any other way to achieve what I want. Turns out I could use withMessageSelector (in fact, I could have selectors based on the topic value itself as it's also a property on the message) or one queue for every receiver/handler if I wanted to avoid a central handler with topic-based delegation in the code that badly.