Backpressure / message delivery and the Python API

Options

Hi,

We have a non exclusive durable queue with several consumers (compute worker) who connect using the Python API. Each consumer runs the MessageHandler in a separate thread, essentially just filling up a buffer with messages, the messages are then consumed by the main thread in the compute worker and results sent out.

Initially the issue was that the message buffer would receive messages faster than the compute worker could process them and so as the message buffer was by design a leaky FIFO queue start dropping messages. To prevent this, rather than be sensible and implement an infinite length buffer, I decided to sleep the MessageHandler thread if the length of the buffer was over some threshold - this works - the main thread has a chance to catchup and drains the buffer. But rather than delivering the messages to one of the available consumers Solace simply holds the messages until this consumer is available again and then delivers the messages. This is annoying as you can easily get instances where one particular consumer has loads of messages to process whilst the others are sitting there idle.

I have thought of disconnecting rather than just sleeping the thread, but this seems a bit extreme.


What I would like to do is be able to deal with cases where say there are 100 messages to process and 2 consumers, normally each consumer would get 50 messages each. But say the first message for consumer 1 takes 100 times longer to process than consumer 2, I don't want consumer 2 sitting there doing nothing after it has processed it's 50 messages, with consumer 1 still having 49 to process. I want Solace to reassign the 49 messages to Consumer 2. I guess one way of achieving this is to connect, as soon as you get a single message disconnect. process that message, then reconnect. Would this even work? Is there a better way?


Many thanks

Alexander

Tagged:

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 923 admin
    Options

    Hi @AlexanderJHall,

    Depending on your performance requirements I think the simplest way to tune this would be to set your "Maximum Delivered Unacknowledged Messages per Flow" on your queue to be equal to the number of workers in your apps. This way the broker will only send your app the max number of messages that it can be processing at any given time. This will essentially make message acknowledgements control the message flow -> each app would get another message for each one that it acknowledges.

  • AlexanderJHall
    AlexanderJHall Member Posts: 6
    Options

    Hi @marc ,

    Thanks very much for the quick reply!

    "Maxiumum Delivered Unacknowledged Messages per Flow" sounds good when you know in advanced how many workers / consumers you have. However in this situation the number of workers / consumers can be scaled up / down depending on conditions. Is there another way to control the flow that would be responsive to workers / consumers entering or leaving the queue?

    Many thanks!

    A