C# consumer flow dispose
Hi
I'm new to Solace + this is my first post, so be gentle :D
I'm building a worker (C#) that consumes messages off a Solace Queue using the SolClient messaging API.
On application shutdown, I attempt to stop & dispose the flow. See code below:
_flow = session.CreateFlow(flowProperties, queue, null, HandleMessageEvent, HandleFlowEvent); _logger.LogInformation("Subscribed to queue {MessageQueue}", Q); do { _waitEventWaitHandle.WaitOne(); } while (!stoppingToken.IsCancellationRequested && !_isSessionDown); _flow.Stop(); var sw = Stopwatch.StartNew(); _flow.Dispose(); _logger.LogInformation("Flow disposed in {Milliseconds}ms.", sw.ElapsedMilliseconds);
The result - Dispose takes a while to complete.
Note: Above flow window size is set to default (255). When I adjust the window size, making it smaller, the dispose seems to complete quicker.
Any idea what might cause the dispose to take so long?
Thanks
Best Answer
-
Hi Shaun,
You are correct, that is almost assuredly what is happening. Your message handler sets the event on the very first message. However if it takes so long to process that message, there may be up to 10000 bytes buffered (messages 2-255 depending on message size) that will be processed before our internal 'context' thread will look for the dispose event and process that. In fact, as the kernel scheduling of events on file descriptors (the native API uses poll) is indeterminate, there many be several reads of 10000 bytes performed before the dispose is handled.
This will serve to make it unpredictable how many messages are processed before dispose() closes the flow.
Certainly if your intent is to only process one message per flow open/close, you should set the flow windowize to 1. This would be a poorly performing solution however, as there is a lot of overhead to flow-open/flow-close which can be eliminated by handling streams of messages.
Also in general, the application should not spend 1-3 seconds in a callback. The .NET API is designed for high performance, low latency and few buffer copies. One of the ways this is achieved is by calling the application callback directly from the internal callback. This gives the application great power and with great power comes great responsibility. Holding up the context thread will lead to unexpected side effects like this, blocking or slowing down other threads and interfering with the protocol.
If your per message processing is known to be slow, it is recommended that you just queue the message in the callback, and let another thread process that queue.
Regards,
Ragnar
0
Answers
-
Hi,
Can you provide some numbers? What do you mean by 'a while' and how big a difference does the window size make?
In general 'flow.Dispose()' is a simple non-blocking operation that may cause some garbage collection as objects are destroyed. However it can be held up if the callback thread is running slow or blocked. As your main loop does nothing but wait for a signal, the question the comes to mind is what is generating that signal?
Is the signal generated in a callback on received message or event? What does that callback do after sending the signal?
Regards,
Ragnar
0 -
Hey Ragnar
Thank you for your response.
With a window size of 255, it seems to take anywhere between 0 and 60s to dispose.
With a window size of 1, it disposes instantly.
So handling a message takes between 1-3s, so I guess you could call it a slow-ish consumer.
Same code below + message handler code sending the signal to unblock:
_flow = session.CreateFlow(flowProperties, queue, null, HandleMessageEvent, HandleFlowEvent); _logger.LogInformation("Subscribed to queue {MessageQueue}", Q); do { _waitEventWaitHandle.WaitOne(); } while (!stoppingToken.IsCancellationRequested && !_isSessionDown); _flow.Stop(); var sw = Stopwatch.StartNew(); _flow.Dispose(); _logger.LogInformation("Flow disposed in {Milliseconds}ms.", sw.ElapsedMilliseconds);
The message handler below:
private void HandleMessageEvent(object? source, MessageEventArgs args) { try { _logger.LogInformation("Received message."); using var message = args.Message; // Process message. This would average between 1-3 seconds. _flow.Ack(message.ADMessageId); _logger.LogInformation("Message acknowledged."); } catch (Exception ex) { _logger.LogError("Exception thrown: {0}", ex.Message); } finally { _waitEventWaitHandle.Set(); } }
I do see messages still being handled while the Dispose is 'blocked'. Its like its trying to drain the flow before disposing.
Hope this helps.
Thanks
Shaun
0 -
Hi Shaun,
You are correct, that is almost assuredly what is happening. Your message handler sets the event on the very first message. However if it takes so long to process that message, there may be up to 10000 bytes buffered (messages 2-255 depending on message size) that will be processed before our internal 'context' thread will look for the dispose event and process that. In fact, as the kernel scheduling of events on file descriptors (the native API uses poll) is indeterminate, there many be several reads of 10000 bytes performed before the dispose is handled.
This will serve to make it unpredictable how many messages are processed before dispose() closes the flow.
Certainly if your intent is to only process one message per flow open/close, you should set the flow windowize to 1. This would be a poorly performing solution however, as there is a lot of overhead to flow-open/flow-close which can be eliminated by handling streams of messages.
Also in general, the application should not spend 1-3 seconds in a callback. The .NET API is designed for high performance, low latency and few buffer copies. One of the ways this is achieved is by calling the application callback directly from the internal callback. This gives the application great power and with great power comes great responsibility. Holding up the context thread will lead to unexpected side effects like this, blocking or slowing down other threads and interfering with the protocol.
If your per message processing is known to be slow, it is recommended that you just queue the message in the callback, and let another thread process that queue.
Regards,
Ragnar
0 -
Hi Ragnar
Thanks again for the response.
Just to be clear. I want to process more than one message per flow, this is why I have the do...while loop around the wait/blocking operation. See below:
do { _waitEventWaitHandle.WaitOne(); } while (!stoppingToken.IsCancellationRequested && !_isSessionDown);
Every time a message is handled, I send a signal to the blocked thread, In order to evaluate the cancellation token - just in case the application is trying to shut down gracefully.
I might need to go back to the drawing board. The reason I am doing the 'slow' processing in the callback, is because I want to make sure that I do process the message before ack-ing it. Perhaps I should rather persist these message 'instructions' to store (db or persisted cache), then process them in another process. This way if the application dies, I can recover those 'instructions' from store. Also persisting these messages and not processing them, should put less strain on the callback.
Would you agree?
Thanks
Shaun
0 -
Hi Shaun,
Please be aware there is no requirement to acknowledge the message in the callback. So you don't need to go the lengths of persisting the message in the callback just so you can safely acknowledge it.
You must acknowledge the message before you Dispose the flow is the only requirement. So you could simply put all your 'process message' logic in a thread that is reading messages from your own defined message queue. The acknowledge the message from that thread.
It may also be possible to use an executor service or pool but I would avoid starting a new thread per message.
Ragnar
0 -
Thanks Ragnar,
That does make sense.
I am still a bit unclear on how the use the Flow concept optimally.
Is it acceptable to have one Flow instance for the duration of the application?
The only reason I would want to stop/dispose the flow & re-create, is if I have a message ack that failed. My understanding is that creating a new flow will send those un-acked message again.
Thanks
Shaun
0 -
Hi Shaun,
Yes that is acceptable. A 'Flow' represents the connection between your application and an endpoint on the broker, typically a queue. A session represents the connection between your application and the broker itself. Typically most applications have one session, and one flow for the duration of the application.
However it possible to conceive other scenarios where an application may need to connect to 2 or more queues, in which case there might be multiple flows in the session. Or applications sharing a queue where you may want to open/dispose a flow many times. Although in the latter case you'd almost always be far better using a non-exclusive queue and just keep the flow open.
And as you already know, you may want to dispose/re-open a flow to force redelivery of unacknowledged messages.
Regards,
Ragnar
0 -
Hey @Ragnar and @shaunvdberg , great convo here around threading and Flows in C#. As a Java guy, our API is a bit different.
So, in C# (and C) the callback that the Flow runs when it receives a message is on the API Context thread, and there's only one of those, so blocking up that thread is bad for a number of reasons. And since you can acknowledge messages from anywhere, any thread, a typical setup for an app that might take a while to process messages would be:
onMessage (API thread) -> take message, place in linked list or ring buffer or something, return (Application-owned thread) -> block taking messages off linked list or ring buffer do processing ACK message from there check if app is suppposed to exit?
Which I guess would lend itself well to building a multi-threaded parallel processor? You could have multiple application threads reading from that linked list or in-memory queue and processing in parallel. (IF ORDER DOESN'T MATTER).
Shaun: why/when would you want to force a redelivery of a message from the queue (by closing the Flow)? Like, if it couldn't be processed/parsed, or if some downstream system you're sending it to is unavailable? You could build some retry logic into your processing function?
0 -
Thanks for the feedback @Ragnar @Aaron
Your suggestion(s) definitely assisted me in achieving better flow in my application. Also gave me a deeper understand of the solace client & how it works - much appreciated.
To answer your question Aaron: not too concerned with malformed messages - I will never be able to process them, no matter how many times I try - so I will throw these message onto another Q or db entry etc. I was more concerned about downstream systems being down or unavailable - so to your point - some retry logic here would make sense.
1 -
Hello, I am new to solace and I have a question about having multiple flows.
Consider the following situation:
A consumer is consuming messages from 2 queues. Then the consumer will create 2 flows.
Here the consumer is not just consuming the messages, it also stores/processes the messages. And it would be ideal to send acknowledgement only when the consumer successfully stores/processes the message.
Now that you have a consumer, consuming from multiple queues then you need to know the flow instance at the time of sending an acknowledgement.
How to get flow instances? Or at least know which flow is used when the message was consumed?
I am using .NET library 10.17.0
0 -
Hello, @laadpiyush
Great question. I'm looking into this and have some ideas.
We don't want to lose this discussion though buried deep in another discussion. While we're formulating our answer, can you create a completely separate post for this so the information can be easily found again in the future.
Thanks
Ragnar
0