Reprocess messages which are not Ack-back by consumer
Hello There,
Using JCSMP API.
This might be repetetive query, appreciate if you point me to solution.
1. I am using Guaranteed Delivery Queue 'DeliveryMode.PERSISTENT' to publish a message on queue.
2. I have Single Consume for this Queue
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
cons = session.createFlow(myXMLMessageListener, flow_prop, endpoint_props);
3. Messages are coming in a queue, consumer is reading messages, processing it and once processing is complteted successfully, message been Acknowledged, all good...
(Below is Issue)
4. There are 5 messages on the queue
- message 1 processed normally
- message 2 consumed by consumer, while processing message there was some application issue and COULD NOT ACKNOWLEDGED the message....!!!
- message 3, 4, 5 consumed and processed nomally and Acknowledged
- MESSAGE 2 still on the queue and not been reprocessed...
- To Reprocess message 2, application has to be restarted (so, basically reestablishing queue connection)
--> What needs to be done here to reprocess message 2 without restarting the application?
Best Answer
-
Hey @Ashok . Thanks for the question. We've been asked before for a "NACK" capability, to be able to send a message back to the queue. I'm gathering requirements around this feature request: what exactly would you want as expected behaviour? Since you have an exclusive queue, this receiver/consumer is the only one active, so it would simply receive the message again in callback right away. Is it that you don't want your consumer to have any state, therefore you don't want it to "hold on" to the message to retry again? Any guidance on your requirements would be useful. Thanks!
1
Answers
-
Hi @Ashok, the basic pattern here is that your application should keep hold of the message until it has been successfully acknowledged. So let's say you receive a message, perform some data processing and acknowledge the message. If the data processing fails, provided you still have a copy of the message you can perform whatever business logic you need to - for instance reprocessing the data.
You should only remove references to the message object once you're satisfied that acknowledge has been called successfully.0 -
Thanks Tom...
But assume, there is database outage for couple of minutes and that is the reason of failures. Here I do not want to hold messages for few minutes into my application for reprocessing!!
Is there any PROPERTY we set to kind of 'Unbind message from consumer' and put it back in pool for reprocessing?0 -
Hey @Ashok . Thanks for the question. We've been asked before for a "NACK" capability, to be able to send a message back to the queue. I'm gathering requirements around this feature request: what exactly would you want as expected behaviour? Since you have an exclusive queue, this receiver/consumer is the only one active, so it would simply receive the message again in callback right away. Is it that you don't want your consumer to have any state, therefore you don't want it to "hold on" to the message to retry again? Any guidance on your requirements would be useful. Thanks!
1 -
Hello Aaron, yes this is what I am looking for, you said it correctly
In simple terms:- One Guaranteed Queue
- One Subscriber
- Will ACK the message in certain scenario
- Will NACK the message in certain scenario: Here Message should be available for Subscriber to process (as good as like new message)
0 -
Hi.
While we're waiting for NACK functionality, can this scenario be meanwhile achieved with a TransactedSession? Calling commit() on it if message was processed successfully or rollback() if there was an error while processing and moving it back to the queue (or rather not removing it and allowing to be further processed by another queue consumer) .Thanks
0 -
Hi @Anton, I like the idea, but using Transacted Sessions has a performance penalty - it's best used for multiple messages that must all be processed as one blob.
Back to @Ashok, in your example of the database outage you have 2 basic choices: detect the database problem and close the application, or keep the application open and pause. If you close the application, the pending messages are implicitly nacked as @swenhelge mentioned. If you keep the application open, as @Aaron mentioned if you nack the messages they will simply be immediately resent to you, so that doesn't help.
This is fundamental to a queuing based messaging: the queue is where your persistence is. All you have to do to be safe is not ack the message until it's been received by the database. In this way all failure mechanisms are dealt with.
The only problem with this implementation is that you're holding memory for the pending messages. If that really is a problem, close the flow, then when you detect the database has started re-open it and the pending messages will be re-delivered.
0 -
@TomF thanks for clarifying. Can we still go this way if this penalty is acceptable? Honestly, I think NACK is an essential thing as it gives more flexibility - not every system needs/wants to implement advanced mechanisms inside to deal with ACKs and in a lot of cases moving message back to the queue will be totally enough.
0 -
@Anton, it's an automatic rollback: "f a client application closes a Transacted Session without first committing the current Transaction, the API rolls back that Transaction, and it does not automatically reattempt the Transaction."
See https://docs.solace.com/Solace-PubSub-Messaging-APIs/API-Developer-Guide/Closing-Transactions.htm
1 -
@TomF I'm experimenting right now with approach I suggested above (using TransactedSession). My test solution consists of a queue producer and queue consumer. Producer sends 5 messages to the queue. Consumer accepts first 2 (by calling transactedSession.Commit()) then only 3rd one is being rejected (by calling transactedSession.Rollback()). I setup dead message queue and set Maximum Redelivery Count to 3. As a result for some reason I got 3 messages in the DMQ (3,4 and 5) but I expected only message 3 go there. Am I missing something here?
0 -
Messages are put into queue using "normal" session. It's on Queue consumer (which is another process) where I create transacted session and then Flow using it. So, my idea was in MessageReceived handler call TransactedSession.Commit() to remove it from the queue or TransactedSession.Rollback() to return back. This is sort of working but when Number of Redeliveries exceeded this and all other messages currently sitting in the queue are going to DMQ.
0 -
@TomF Looking at this further and it looks like ClientAck is enough for me. The only question I have is how to restart Flow in a right way so message will be "nacked" and re-delivered later. When I do Flow.Stop() and then Flow.Start() unacked message isn't being re-delivered. Should I stop flow, re-create Session, then Flow and start it?
0 -
Hi @Anton, flow.stop() isn't enough, you need to use .close(). .stop() just pauses the delivery of messages and leaves the flow bound to the queue. When you reopen the flow you should see the re-deliveries then. I don't think you'll need to close the session, just the flow.
0 -
Found an issue - I was re-creating Flow in the received message handler. Now, when I separated into two different threads it works as expected. The last question (I hope) - can someone explain MaxUnackedMessages property on a Flow and can this property somehow affect what I'm trying to achieve. Or maybe if there is recommended values for it.
Thanks.
0 -
@Anton, I tried some variations on local transacted sessions. If I understand correctly, here's your sequence of receive messages:
1. Received, commit
2. Received, commit
3. Received, rollback
What happens then? Do you receive messages 4 and 5, or do you close the flow after getting 3? How are you receiving the messages, usings IFlow.ReceiveMsg()?Regarding MaxUnackedMessages, messages on a flow are sent using a windowing scheme. This window size of messages can be sent to the application before the broker stops sending messages and waits for an ack to be received. It only applies if you are setting the Flow Property AckMode to ClientAck and manually acknowledging the messages. If you're using Ackmode AutoAck, the acknowledgement of the message will be sent as soon as the Delegate returns.
So, in your case, if you're using ClientAck, you'll have MaxUnackedMessages waiting for your application to process. If you receive and don't call .ack() on the message, when you close the flow the message will be marked as undelivered, just as if you'd called .rollback()
0 -
@TomF with TransactedSession I was rolling back on message 3. I was not restarting the flow. After few re-deliveries message 3 and all messages behind it were put to the DMQ.
However, I successfully implemented my scenario with ClientAck and restarting the flow to simulate "nack". So I'll stick to this solution and wait for a Nack() method added by you guys. Regarding the MaxUnackedMessages - do I understand correctly that for my needs I need to set it to 1, so only one message at a time will be delivered to the client?
0