🎄 Happy Holidays! 🥳

Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.

Happy Holidays!

Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.

Multiple consumer in DotNet not working in parallel despite having non-exclusive queue.

nomassama
nomassama Member Posts: 11 ✭✭

Hi, I am having an issue with Solace Consumer that I have implemented in dotnet. I have 10 consumers (my understanding is each consumer means each separate flow for message).

So with this information I created multiple flows assigned them to separate threads/tasks.
After Sending a burst of 5000 messages I did not see many consumers pulling messages at one moment only one consumer was doing this.

10 consumers
Tested with Non-Exclusive queue with Partition Count 0.
Tested with Non-Exclusive queue with Partition Count 10. (Saw each consumer took equal messages but did not seem to work concurrently with others)



Best Answers

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    #2 Answer ✓

    Greetings! Your understanding of how it should work is correct, so lets's dig a little deeper and see why you are not observing the expected behavior.

    First, typically when we define a "consumer" we are referring to multiple processes, microservices, or devices which are pulling messages from a queue. However, you are correct, that if a single application creates multiple flows it will also behave as multiple consumers.

    Now, for a non-partitioned queue (non-exclusive queue with partition count of 0) there is little to no benenfit of creating multiple flows on a single session instance, and even in the case of partitioned queues, the only side effect of having multiple-flows to the same queue in a single process would be how many partitions are assigned.

    The crux of your problem likely lies in how you are dispatching the messages to their worker threads after they are received. You mention that you "assigned them to separate threads/tasks", but can you provide more clarity or code samples to show how you did that?

    Note that all code which executes within your EventHandler<MessageEventArgs> messageEventHandler callback initially runs on the same dispatcher thread for all flows, so that in order to acheive parallel processing, it is necessary to push any received messages to their respective processing threads as fast as possible.

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    edited September 4 #3 Answer ✓

    Yes, so even if you create these Consumer object instances on separate threads, their respective handler events will fire on the same thread (as verified in your provided logs screenshot). This is because in .NET, objects are not bound to a specific thread regardless of how they are instantiated, and that at the networking layer, the Solace client library initially dispatches all messages on a shared thread to ensure message (and client events) are handled in-order.

    If you wish for each consumer instance to process messages separately from the others, there are numerous patterns you can use (such as Task chaining or Channels) but here is a-super simple, older style approach which spins up a thread instance per consumer:

    private IFlow flow;
    private BlockingCollection<MessageEventArgs> messageEvents;
    private Thread processThread;
    
    public QueueConsumer() { messageEvents = new BlockingCollection<MessageEventArgs>();
    // Other startup and initialization logic such has creating the IFlow instance…
    // flow = …
    processThread = new Thread(new ThreadStart(ProcessMessage))
    {
    IsBackground = true
    };
    processThread.Start();
    }

    void HandleMessageEvent(object source, MessageEventArgs args) { messageEvents.Add(args); }

    private void ProcessMessage() { while (true) {
    var args = messageEvents.Take();

    // Process the message here...

    // Ack message when done
    flow.Ack(args.Message.ADMessageId);
    }
    }

    This example takes messages as they are received and sends them to a BlockingCollection<T> instance where they are picked up on a dedicated and persistent processing thread.

    Regarding your question on partitions: yes, you should never have more consumers (devices, processes, or flows) than number of partitions unless you want them to sit in standby mode should another consumer go down.

Answers

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    #4 Answer ✓

    Greetings! Your understanding of how it should work is correct, so lets's dig a little deeper and see why you are not observing the expected behavior.

    First, typically when we define a "consumer" we are referring to multiple processes, microservices, or devices which are pulling messages from a queue. However, you are correct, that if a single application creates multiple flows it will also behave as multiple consumers.

    Now, for a non-partitioned queue (non-exclusive queue with partition count of 0) there is little to no benenfit of creating multiple flows on a single session instance, and even in the case of partitioned queues, the only side effect of having multiple-flows to the same queue in a single process would be how many partitions are assigned.

    The crux of your problem likely lies in how you are dispatching the messages to their worker threads after they are received. You mention that you "assigned them to separate threads/tasks", but can you provide more clarity or code samples to show how you did that?

    Note that all code which executes within your EventHandler<MessageEventArgs> messageEventHandler callback initially runs on the same dispatcher thread for all flows, so that in order to acheive parallel processing, it is necessary to push any received messages to their respective processing threads as fast as possible.

  • nomassama
    nomassama Member Posts: 11 ✭✭
    edited September 4 #5

    Thanks for the in-depth analysis. So let me share a snippet of how I have implemented this.

      public class Consumer
    {
    private IFlow Flow;
    private readonly ISession Session;
    FlowProperties FlowProperties;
    IEndpoint Endpoint;
    ISubscription Subscription;
    public Consumer(ISession pSession, FlowProperties pFlowProperties, IEndpoint pEndPoint, ISubscription pSubscription);

    private void HandleMessageEvent(object source, MessageEventArgs args);


    }


    I have list of task where this consumer is allocated and initialized as a separate object.
    So say I have 10 Tasks each task bind to each object of number of Consumers. Pardon my French (Dotnet), but I believe since each flow object is isolated from each other and has their own HandleMessageRequest() to call but upon running it it did behave differently. I also assigned a consumer ID to each consumer and here is the result.



    My use case is very simple right now, 20 threads/consumers on non-exclusive queue. All competing.

    one more query, I have checked the concept of partitions little bit I would say it will make sense if I make 20 partitions If I have 20 threads or so on initial level at least would that make sense?

    for a non-partitioned queue (non-exclusive queue with partition count of 0) there is little to no benenfit of creating multiple flows on a single session instance, and even in the case of partitioned queues, the only side effect of having multiple-flows to the same queue in a single process would be how many partitions are assigned.

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    edited September 4 #6 Answer ✓

    Yes, so even if you create these Consumer object instances on separate threads, their respective handler events will fire on the same thread (as verified in your provided logs screenshot). This is because in .NET, objects are not bound to a specific thread regardless of how they are instantiated, and that at the networking layer, the Solace client library initially dispatches all messages on a shared thread to ensure message (and client events) are handled in-order.

    If you wish for each consumer instance to process messages separately from the others, there are numerous patterns you can use (such as Task chaining or Channels) but here is a-super simple, older style approach which spins up a thread instance per consumer:

    private IFlow flow;
    private BlockingCollection<MessageEventArgs> messageEvents;
    private Thread processThread;
    
    public QueueConsumer() { messageEvents = new BlockingCollection<MessageEventArgs>();
    // Other startup and initialization logic such has creating the IFlow instance…
    // flow = …
    processThread = new Thread(new ThreadStart(ProcessMessage))
    {
    IsBackground = true
    };
    processThread.Start();
    }

    void HandleMessageEvent(object source, MessageEventArgs args) { messageEvents.Add(args); }

    private void ProcessMessage() { while (true) {
    var args = messageEvents.Take();

    // Process the message here...

    // Ack message when done
    flow.Ack(args.Message.ADMessageId);
    }
    }

    This example takes messages as they are received and sends them to a BlockingCollection<T> instance where they are picked up on a dedicated and persistent processing thread.

    Regarding your question on partitions: yes, you should never have more consumers (devices, processes, or flows) than number of partitions unless you want them to sit in standby mode should another consumer go down.

  • nomassama
    nomassama Member Posts: 11 ✭✭

    Thanks That helps alot.

This Week's Leaders