Try PubSub+

Reprocess messages which are not Ack-back by consumer

AshokAshok Member Posts: 5

Hello There,
Using JCSMP API.
This might be repetetive query, appreciate if you point me to solution.
1. I am using Guaranteed Delivery Queue 'DeliveryMode.PERSISTENT' to publish a message on queue.
2. I have Single Consume for this Queue
endpoint_props.setAccessType(EndpointProperties.ACCESSTYPE_EXCLUSIVE);
cons = session.createFlow(myXMLMessageListener, flow_prop, endpoint_props);
3. Messages are coming in a queue, consumer is reading messages, processing it and once processing is complteted successfully, message been Acknowledged, all good...
(Below is Issue)
4. There are 5 messages on the queue
- message 1 processed normally
- message 2 consumed by consumer, while processing message there was some application issue and COULD NOT ACKNOWLEDGED the message....!!!
- message 3, 4, 5 consumed and processed nomally and Acknowledged
- MESSAGE 2 still on the queue and not been reprocessed...
- To Reprocess message 2, application has to be restarted (so, basically reestablishing queue connection)

--> What needs to be done here to reprocess message 2 without restarting the application?

Best Answer

  • AaronAaron Member, Moderator, Employee Posts: 184 Solace Employee
    #2 Accepted Answer

    Hey @Ashok . Thanks for the question. We've been asked before for a "NACK" capability, to be able to send a message back to the queue. I'm gathering requirements around this feature request: what exactly would you want as expected behaviour? Since you have an exclusive queue, this receiver/consumer is the only one active, so it would simply receive the message again in callback right away. Is it that you don't want your consumer to have any state, therefore you don't want it to "hold on" to the message to retry again? Any guidance on your requirements would be useful. Thanks!

Answers

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    Hi @Ashok, the basic pattern here is that your application should keep hold of the message until it has been successfully acknowledged. So let's say you receive a message, perform some data processing and acknowledge the message. If the data processing fails, provided you still have a copy of the message you can perform whatever business logic you need to - for instance reprocessing the data.
    You should only remove references to the message object once you're satisfied that acknowledge has been called successfully.

  • AshokAshok Member Posts: 5

    Thanks Tom...
    But assume, there is database outage for couple of minutes and that is the reason of failures. Here I do not want to hold messages for few minutes into my application for reprocessing!!
    Is there any PROPERTY we set to kind of 'Unbind message from consumer' and put it back in pool for reprocessing?

  • swenhelgeswenhelge Member, Employee Posts: 59 Solace Employee

    Hi,
    apparently the only way to negatively acknowledge messages is by closing the FlowReceiver and re-establish it (create and start).

  • AshokAshok Member Posts: 5

    Yeah, thats the workaround, I will go with something similar.
    Thank you folks.
    Would have been great if there is any property.

  • AaronAaron Member, Moderator, Employee Posts: 184 Solace Employee
    #7 Accepted Answer

    Hey @Ashok . Thanks for the question. We've been asked before for a "NACK" capability, to be able to send a message back to the queue. I'm gathering requirements around this feature request: what exactly would you want as expected behaviour? Since you have an exclusive queue, this receiver/consumer is the only one active, so it would simply receive the message again in callback right away. Is it that you don't want your consumer to have any state, therefore you don't want it to "hold on" to the message to retry again? Any guidance on your requirements would be useful. Thanks!

  • AshokAshok Member Posts: 5

    Hello Aaron, yes this is what I am looking for, you said it correctly :)
    In simple terms:

    • One Guaranteed Queue
    • One Subscriber
      • Will ACK the message in certain scenario
      • Will NACK the message in certain scenario: Here Message should be available for Subscriber to process (as good as like new message)
  • AntonAnton Member Posts: 18

    Hi.
    While we're waiting for NACK functionality, can this scenario be meanwhile achieved with a TransactedSession? Calling commit() on it if message was processed successfully or rollback() if there was an error while processing and moving it back to the queue (or rather not removing it and allowing to be further processed by another queue consumer) .

    Thanks

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    Hi @Anton, I like the idea, but using Transacted Sessions has a performance penalty - it's best used for multiple messages that must all be processed as one blob.

    Back to @Ashok, in your example of the database outage you have 2 basic choices: detect the database problem and close the application, or keep the application open and pause. If you close the application, the pending messages are implicitly nacked as @swenhelge mentioned. If you keep the application open, as @Aaron mentioned if you nack the messages they will simply be immediately resent to you, so that doesn't help.

    This is fundamental to a queuing based messaging: the queue is where your persistence is. All you have to do to be safe is not ack the message until it's been received by the database. In this way all failure mechanisms are dealt with.

    The only problem with this implementation is that you're holding memory for the pending messages. If that really is a problem, close the flow, then when you detect the database has started re-open it and the pending messages will be re-delivered.

  • AntonAnton Member Posts: 18

    @TomF thanks for clarifying. Can we still go this way if this penalty is acceptable? Honestly, I think NACK is an essential thing as it gives more flexibility - not every system needs/wants to implement advanced mechanisms inside to deal with ACKs and in a lot of cases moving message back to the queue will be totally enough.

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton yes, of course you can use transacted sessions. Some frameworks use them under the hood for single messages - not what I'd recommend, but it does work.

  • AntonAnton Member Posts: 18

    @TomF btw, what will happen if process crashes because of unhandled exception without calling commit() or rollback()? Thanks

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton, it's an automatic rollback: "f a client application closes a Transacted Session without first committing the current Transaction, the API rolls back that Transaction, and it does not automatically reattempt the Transaction."

    See https://docs.solace.com/Solace-PubSub-Messaging-APIs/API-Developer-Guide/Closing-Transactions.htm

  • AntonAnton Member Posts: 18

    @TomF I'm experimenting right now with approach I suggested above (using TransactedSession). My test solution consists of a queue producer and queue consumer. Producer sends 5 messages to the queue. Consumer accepts first 2 (by calling transactedSession.Commit()) then only 3rd one is being rejected (by calling transactedSession.Rollback()). I setup dead message queue and set Maximum Redelivery Count to 3. As a result for some reason I got 3 messages in the DMQ (3,4 and 5) but I expected only message 3 go there. Am I missing something here?

  • AntonAnton Member Posts: 18

    I'm also trying to achieve this with closing and opening Flow again, as this was suggested and "nacked" messages are suck in the queue and are not redelivered...

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton with your transacted session, how many messages are in a transaction? Are you sending one per transaction (so you have 5 transactions) or do you have one transaction with all 5 messages in it?

  • AntonAnton Member Posts: 18

    Messages are put into queue using "normal" session. It's on Queue consumer (which is another process) where I create transacted session and then Flow using it. So, my idea was in MessageReceived handler call TransactedSession.Commit() to remove it from the queue or TransactedSession.Rollback() to return back. This is sort of working but when Number of Redeliveries exceeded this and all other messages currently sitting in the queue are going to DMQ.

  • AntonAnton Member Posts: 18

    @TomF Looking at this further and it looks like ClientAck is enough for me. The only question I have is how to restart Flow in a right way so message will be "nacked" and re-delivered later. When I do Flow.Stop() and then Flow.Start() unacked message isn't being re-delivered. Should I stop flow, re-create Session, then Flow and start it?

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    Hi @Anton, flow.stop() isn't enough, you need to use .close(). .stop() just pauses the delivery of messages and leaves the flow bound to the queue. When you reopen the flow you should see the re-deliveries then. I don't think you'll need to close the session, just the flow.

  • AntonAnton Member Posts: 18

    @TomF thanks but I don't see Close() method on IFlow interface in .net...

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton, sorry I looked at the original question which talked about JSCMP, I forgot you're .net... You're right, there's no .close(), so I think you'd need to .dispose(). This can't be done from the delegate.

  • AntonAnton Member Posts: 18

    @TomF I assume after calling Dispose() I have to re-create flow. Am I right?

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton, Yes exactly. You're destroying the flow and re-creating it.

  • AntonAnton Member Posts: 18

    @TomF thanks, going to try this now...

  • AntonAnton Member Posts: 18

    @TomF so, getting closer but still not quite there.
    After this sequence:
    Flow.Stop()
    Flow.Dispose()
    Flow = Session.CreateFlow()
    Flow.Start()

    Message is re-delivered to another queue consumer but original one stops to receive any new message at all.

  • AntonAnton Member Posts: 18

    Found an issue - I was re-creating Flow in the received message handler. Now, when I separated into two different threads it works as expected. The last question (I hope) - can someone explain MaxUnackedMessages property on a Flow and can this property somehow affect what I'm trying to achieve. Or maybe if there is recommended values for it.

    Thanks.

  • AshokAshok Member Posts: 5

    Thank you folks for your valuable time and inputs. Some of these might work, but the real solution I feel is NACK, so developer can focus on actual implementation rather managing it into application logic.

  • TomFTomF Member, Employee Posts: 225 Solace Employee

    @Anton, I tried some variations on local transacted sessions. If I understand correctly, here's your sequence of receive messages:
    1. Received, commit
    2. Received, commit
    3. Received, rollback
    What happens then? Do you receive messages 4 and 5, or do you close the flow after getting 3? How are you receiving the messages, usings IFlow.ReceiveMsg()?

    Regarding MaxUnackedMessages, messages on a flow are sent using a windowing scheme. This window size of messages can be sent to the application before the broker stops sending messages and waits for an ack to be received. It only applies if you are setting the Flow Property AckMode to ClientAck and manually acknowledging the messages. If you're using Ackmode AutoAck, the acknowledgement of the message will be sent as soon as the Delegate returns.

    So, in your case, if you're using ClientAck, you'll have MaxUnackedMessages waiting for your application to process. If you receive and don't call .ack() on the message, when you close the flow the message will be marked as undelivered, just as if you'd called .rollback()

  • AntonAnton Member Posts: 18

    @TomF with TransactedSession I was rolling back on message 3. I was not restarting the flow. After few re-deliveries message 3 and all messages behind it were put to the DMQ.

    However, I successfully implemented my scenario with ClientAck and restarting the flow to simulate "nack". So I'll stick to this solution and wait for a Nack() method added by you guys. Regarding the MaxUnackedMessages - do I understand correctly that for my needs I need to set it to 1, so only one message at a time will be delivered to the client?

Sign In or Register to comment.