🎄 Happy Holidays! 🥳

Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.

Happy Holidays!

Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.

How to implement task queue using Solace

balast
balast Member Posts: 8
edited November 2022 in General Discussions #1

Hi, I would like to use Solace for a task queue though Solace is overkill for my needs, but I hope it can meet my use case.

I have 2 publishers putting guaranteed messages on a queue, and a single consumer listening to the queue initially, but the numbers of consumers scales up when there are many messages on the queue. When a consumer receives a task, it runs a function which takes ~1 minute to run.

I would like each consumer to take a single message from the queue and only take the next message once the function which processes the message is done (about ~1 minute later). I would like the consumers to listen for messages asynchronously to avoid delays that come with polling. The reason I want each consumer to take a single task is because when a new consumer is spun up, I want it to be able to start processing the next message, rather than having that message wait on the pre-existing consumer until it finishes processing it's first message.

Is this possible? I'm using the python client if that's helpful. Thanks in advance.

Best Answer

  • balast
    balast Member Posts: 8
    #2 Answer ✓

    For any others who find this later, the solution mentioned in https://stackoverflow.com/a/68626352/9848141 seems promising. I'll paste the relevant portion here for reference.

    "You can limit the number of messages sent to your application at a time by adjusting both the Guaranteed Message Window Size in your API properties, and the max-delivered-unacked-messages-per-flow parameter for your queue on your Solace event broker. This will control how many messages are delivered to the API before receiving either a transport acknowledgement, or application acknowledgement."

Answers

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 541 admin

    Hey @balast - if I understood your scenario correctly, you will need to take advantage of manual ack's from the client and window size. Your consumer will only send an ack once its done processing the message. As you can see in this sample

    a persistent message receiver is built with auto acknowledgement configured. you dont want to do that. instead you will send a client ack in the MessageHandler after successfully processing the message

  • balast
    balast Member Posts: 8

    Thanks Tamimi for your quick response. I tried your suggestion, but it didn't work. By your logic, if I turn off auto acknowledgements, and never bother sending the acknowledgement then a subscriber will receive a single message and no more, right? It's pretty quick to see that isn't the case with a small modification to the guaranteed_subscriber.py example file.

    What I see happen is the subscriber receives many messages (I only want it to receive one), then it processes those messages serially. Once that subscriber process is killed, then those messages begin to be sent to other subscribers (if any) since they were never acknowledged.

    I appreciate the help, and I look forward to any other suggestions you might have.

  • balast
    balast Member Posts: 8
    #5 Answer ✓

    For any others who find this later, the solution mentioned in https://stackoverflow.com/a/68626352/9848141 seems promising. I'll paste the relevant portion here for reference.

    "You can limit the number of messages sent to your application at a time by adjusting both the Guaranteed Message Window Size in your API properties, and the max-delivered-unacked-messages-per-flow parameter for your queue on your Solace event broker. This will control how many messages are delivered to the API before receiving either a transport acknowledgement, or application acknowledgement."