Each Message is not removing from Queue after acknowlegement Flow.Ack(message.ADMessageId)
Hi, I am consuming messages from queue using .net c# . I am using MessageAckMode.ClientAck. It's acknowledging with Flow.Ack(message.ADMessageId) immediately message recieving. But it is not removing each message after acknowledgement from queue. All messages are removing at once after the flow end.
Answers
-
Greetings @Monjurul and welcome to the community!
Can you provide some additional clarity on the timing of these operations? Acknowledgements generated by a consumer are not always immediately sent to the broker, but instead are sent once per second or whenever the transport window (a configurable number of messages) is 60% full — whichever occurs first. Any unsent acknowledgements are also sent whenever the flow is closed (as you note).
Are you seeing messages remaining on the queue for greater than a second after acking? Would you like to see messages ack'ed more frequently? And lastly, can you share the code from your message handler?
0 -
I am following the github codes
. I send 5 messages to the queue. And consumed by following codes. But it removes all the messages after consumed at once, not one by one.Flow = Session.CreateFlow(new FlowProperties()
{
AckMode = MessageAckMode.ClientAck,
},
Queue, null, HandleMessageEvent, HandleFlowEvent);
Flow.Start();Console.WriteLine("Waiting for a message in the queue '{0}'...", queueName);WaitEventWaitHandle.WaitOne();
private void HandleMessageEvent(object source, MessageEventArgs args)
{
// Received a messageConsole.WriteLine("Received message.");
using (IMessage message = args.Message)
{
try
{
// Expecting the message content as a binary attachment
Console.WriteLine("Message content: {0}", Encoding.ASCII.GetString(message.BinaryAttachment));
Flow.Ack(message.ADMessageId);
Console.WriteLine($"Message acknowledged with message id :{message.ADMessageId}" );
//finish the program
WaitEventWaitHandle.Set();
}
catch (Exception ex)
{
Console.WriteLine("Error acknowledging message: {0}", ex.Message);
}
} public void HandleFlowEvent(object sender, FlowEventArgs args)
{
// Received a flow event
Console.WriteLine("Received Flow Event '{0}' Type: '{1}' Text: '{2}'",
args.Event,
args.ResponseCode.ToString(),
args.Info);
}}
0 -
So couple of things:
The sample code found on the GitHub site is designed to exit after it receives (and acks) at least one message. The code would look a little different for a real consumer application, and the main application thread would never exit.
I am guessing you are observing that the application is successfully pulling down several messages before exiting even though as-written it appears the code should only process one message per run.
This has to do with the speed of the thread switching. Calling
WaitEventWaitHandle.Set()
does not actually immediately return control to theWaitEventWaitHandle.WaitOne()
calling site (which is on the main thread). The message handler dispatch thread will have sufficient time to continue pulling, processing, and acking before the application can exit.You may be able to force the desired behavior by inserting a
Thread.Yield()
call after setting the wait handle, but this is also not fully guaranteed to work in all cases.I could provide sample code that can do different things, but what would you like to see the application do? Run and pull exactly one message off the queue, then exit?
0