Sending Ack with multiple flows

@Ragnar as per the conversion here is fresh discussion.

Hello, I am new to solace and I have a question about having multiple flows.

Consider the following situation:

A consumer is consuming messages from 2 queues. Then the consumer will create 2 flows.

Here the consumer is not just consuming the messages, it also stores/processes the messages. And it would be ideal to send acknowledgement only when the consumer successfully stores/processes the message.

Now that you have a consumer, consuming from multiple queues then you need to know the flow instance at the time of sending an acknowledgement.

How to get flow instances? Or at least know which flow is used when the message was consumed? 

I am using .NET library 10.17.0

Thanks

Comments

  • Ragnar
    Ragnar Member, Employee Posts: 67 Solace Employee

    Thanks for posting this question.

    It is necessary for the application to track which messages belong to which flows when consuming from multiple flows on a non-transacted session. The most straightforward way to accomplish this is to create your own application object that contains both an IMessage reference and an IFlow reference.

    When processing a received message in your messageEvent callback, instantiate your AppMessage object and save both the IFlow and IMessage there. In your app you would pass around and generally use AppMessage(), and when your ready to acknowledge, the IFlow reference you've saved can be used.

    This would be the solution for non-transacted sessions.

    An alternate approach, especially if the messages you are consuming are related and must be handled together is to create a ITransactedSession, then create your flows on the ITransactedSession instead. In those mode, all received messages are received in a transaction. You do not have to acknowledge individual messages, once you've received and processed both messages, simply call ITransactedSession.commit() to acknowledge both messages. With this approach you must use IFlow.ReceiveMsg() to consume messages on each flow as this is the only way to guaranteed which messages are being committed when commit() is called.

    Ragnar

  • laadpiyush
    laadpiyush Member Posts: 5

    Thank you for the reply @Ragnar

    Storing IFlow and IMessage in an object was my original plan but in the messageEvent callback, I don't see anything that can give IFlow instance. If you can shed some light on how to get the IFlow instance in the messageEvent callback then that would be awesome. (We are using .NET core library 10.17.0)

    Meanwhile, I will think about if transacted sessions are appropriate for the use case or not.

    We want to implement a sticky load balancing pattern as mentioned in https://solace.com/blog/consumer-groups-consumer-scaling-solace/

    We have a sensor that collects high-frequency time-series data and publishes it to the Solace broker. In the current test implementation, we have 4 queues and 2 consumers. Each consumer is connected to two different queues, consuming the data, processing it and then sending an acknowledgement. That's where multiple flows came into the picture.

  • Ragnar
    Ragnar Member, Employee Posts: 67 Solace Employee

    Hello,

    In the registered EventHandler:

    public delegate void EventHandler<TEventArgs>(object sender, TEventArgs e);
    

    The 'sender' is the object that you registered the EventHandler with. So if you pass the EventHandler to ISession.createFlow() then when it is invoked later for a received message, the 'sender' is the IFlow.

    I can't find this explicitly stated anywhere in the online reference. I will make a note to the developer and docs team that some improvements could be made here.

    Ragnar

  • laadpiyush
    laadpiyush Member Posts: 5

    That worked, fantastic.

    Yeah, there is no documentation saying the sender is actually a flow.

    Thank you so much @Ragnar

This Week's Leaders