Hello There,
Using JCSMP API.
This might be repetetive query, appreciate if you point me to solution.
I am using Guaranteed Delivery Queue ‘DeliveryMode.PERSISTENT’ to publish a message on queue.
I have Single Consume for this Queue
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
cons = session.createFlow(myXMLMessageListener, flow_prop, endpoint_props);
Messages are coming in a queue, consumer is reading messages, processing it and once processing is complteted successfully, message been Acknowledged, all good… (Below is Issue)
There are 5 messages on the queue
message 1 processed normally
message 2 consumed by consumer, while processing message there was some application issue and COULD NOT ACKNOWLEDGED the message…!!!
message 3, 4, 5 consumed and processed nomally and Acknowledged
MESSAGE 2 still on the queue and not been reprocessed…
To Reprocess message 2, application has to be restarted (so, basically reestablishing queue connection)
→ What needs to be done here to reprocess message 2 without restarting the application?
Hi @Ashok, the basic pattern here is that your application should keep hold of the message until it has been successfully acknowledged. So let’s say you receive a message, perform some data processing and acknowledge the message. If the data processing fails, provided you still have a copy of the message you can perform whatever business logic you need to - for instance reprocessing the data.
You should only remove references to the message object once you’re satisfied that acknowledge has been called successfully.
Thanks Tom…
But assume, there is database outage for couple of minutes and that is the reason of failures. Here I do not want to hold messages for few minutes into my application for reprocessing!!
Is there any PROPERTY we set to kind of ‘Unbind message from consumer’ and put it back in pool for reprocessing?
Hey @Ashok . Thanks for the question. We’ve been asked before for a “NACK” capability, to be able to send a message back to the queue. I’m gathering requirements around this feature request: what exactly would you want as expected behaviour? Since you have an exclusive queue, this receiver/consumer is the only one active, so it would simply receive the message again in callback right away. Is it that you don’t want your consumer to have any state, therefore you don’t want it to “hold on” to the message to retry again? Any guidance on your requirements would be useful. Thanks!
Hi.
While we’re waiting for NACK functionality, can this scenario be meanwhile achieved with a TransactedSession? Calling commit() on it if message was processed successfully or rollback() if there was an error while processing and moving it back to the queue (or rather not removing it and allowing to be further processed by another queue consumer) .
Hi @Anton, I like the idea, but using Transacted Sessions has a performance penalty - it’s best used for multiple messages that must all be processed as one blob.
Back to @Ashok, in your example of the database outage you have 2 basic choices: detect the database problem and close the application, or keep the application open and pause. If you close the application, the pending messages are implicitly nacked as @swenhelge mentioned. If you keep the application open, as @Aaron mentioned if you nack the messages they will simply be immediately resent to you, so that doesn’t help.
This is fundamental to a queuing based messaging: the queue is where your persistence is. All you have to do to be safe is not ack the message until it’s been received by the database. In this way all failure mechanisms are dealt with.
The only problem with this implementation is that you’re holding memory for the pending messages. If that really is a problem, close the flow, then when you detect the database has started re-open it and the pending messages will be re-delivered.
@TomF thanks for clarifying. Can we still go this way if this penalty is acceptable? Honestly, I think NACK is an essential thing as it gives more flexibility - not every system needs/wants to implement advanced mechanisms inside to deal with ACKs and in a lot of cases moving message back to the queue will be totally enough.
@Anton yes, of course you can use transacted sessions. Some frameworks use them under the hood for single messages - not what I’d recommend, but it does work.
@Anton, it’s an automatic rollback: “f a client application closes a Transacted Session without first committing the current Transaction, the API rolls back that Transaction, and it does not automatically reattempt the Transaction.”
@TomF I’m experimenting right now with approach I suggested above (using TransactedSession). My test solution consists of a queue producer and queue consumer. Producer sends 5 messages to the queue. Consumer accepts first 2 (by calling transactedSession.Commit()) then only 3rd one is being rejected (by calling transactedSession.Rollback()). I setup dead message queue and set Maximum Redelivery Count to 3. As a result for some reason I got 3 messages in the DMQ (3,4 and 5) but I expected only message 3 go there. Am I missing something here?
I’m also trying to achieve this with closing and opening Flow again, as this was suggested and “nacked” messages are suck in the queue and are not redelivered…
@Anton with your transacted session, how many messages are in a transaction? Are you sending one per transaction (so you have 5 transactions) or do you have one transaction with all 5 messages in it?
Messages are put into queue using “normal” session. It’s on Queue consumer (which is another process) where I create transacted session and then Flow using it. So, my idea was in MessageReceived handler call TransactedSession.Commit() to remove it from the queue or TransactedSession.Rollback() to return back. This is sort of working but when Number of Redeliveries exceeded this and all other messages currently sitting in the queue are going to DMQ.
@TomF Looking at this further and it looks like ClientAck is enough for me. The only question I have is how to restart Flow in a right way so message will be “nacked” and re-delivered later. When I do Flow.Stop() and then Flow.Start() unacked message isn’t being re-delivered. Should I stop flow, re-create Session, then Flow and start it?
Hi @Anton, flow.stop() isn’t enough, you need to use .close(). .stop() just pauses the delivery of messages and leaves the flow bound to the queue. When you reopen the flow you should see the re-deliveries then. I don’t think you’ll need to close the session, just the flow.