How to setup the ack timer for guarantee messaging
Hi All,
I am trying to implement the Solace exchange pattern called Topic to Queue Mapping (https://solace.com/samples/solace-samples-dotnet/topic-to-queue-mapping/), the architecture is 1 producer, 1 queue with 2 consumer (access type is non-exclusive).
Considering the loading issue, I assume the consumer will not ack immediately, so added thread.sleep before ack. Then I discovered when I set the sleep time to 10 seconds, Solace did not delete the data from queue and re-transmit the data again and again.
I think perhaps there is ack timer settings in Solace, so I changed the properties, like this sessionProps.ADPublishAckTimerInMsecs = 60000, also extend the window size (sessionProps.ADPublishWindowSize = 500), but it seems not working. Besides, from the Solace broker UI, could not find similar settings. (SessionProperties.AckMode.PER_MSG and SessionProperties.AckMode.WINDOWED also tested)
I wondering if anyone ever tested the same scenario, and figure out how to adjust the ack timer from Solace? Or How Solace calculate the Ack time?
Looking forward to your response, thank you.
Comments
-
-
Hi @alhung, Solace acknowledgement handling is highly tuned and it's usually best not to change the settings you were accessing. The acknowledgement scheme is actually pretty simple from a client point of view: only acknowledge the message once you're sure you've finished with it.
So, with your topic to queue mapping example, I recommend keeping it simple. Don't use the SessionProperties you mention, leave them at the default. Then when you create your flow, set the ackmode to client:
Flow = Session.CreateFlow(new FlowProperties() { AckMode = MessageAckMode.ClientAck }, ...
This stops the API sending aknowledgements automatically so you can send them when you need to by calling Flow.Ack:
private void HandleMessageEvent(object source, MessageEventArgs args) { using (IMessage message = args.Message) { // Process the message. You might want to put your delay here. // ACK the message Flow.Ack(message.ADMessageId); ... } }
See the SolaceSamples GitHub for more details, specifically the QueueConsumer.
The parameters you were accessing are low level API details of how acknowledgements are sent back to the broker, and are there primarily for performance tuning. Let's get the basics working first
2 -
Hi @TomF, thanks for your replies. Please allow me to describe my question again with code.
When received the message, I set the thread.sleep before ack, because I want to simulate the consumer already did some tasks (e.g. parse data, insert into database and so on), means the consumer will not ack to broker immediately.
https://imgur.com/ziZzWmMprivate void HandleMessageEvent(object source, MessageEventArgs args)
{ using (IMessage message = args.Message) { // Expecting the message content as a binary attachment ... //set ack delay Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text)); // ACK the message Flow.Ack(message.ADMessageId);
Based on the sample code of producer, it sends different data by loop, so the consumer will late to ack for each data.
for (int i = 0; i < TotalMessages; i++)
{ // Create the message content as a binary attachment message.BinaryAttachment = Encoding.ASCII.GetBytes( string.Format("Topic to Queue Mapping Tutorial! Message ID: #{0} data{1}", idx, i)); // Send the message to the queue on the Solace messaging router SetText("Sending message ID " + i + " to topic " + tutorialTopic.Name + " mapped to queue " + queueName + "..."); ReturnCode returnCode = session.Send(message);
Then I set the ack delay time as 7 seconds, you can see from the following image, the queue data could not be deleted and the consumer received data continuously. That why I would like to adjust the ack timer, because I was guessing that the broker thought the ack was late, then consumer may not successfully received the data.
Do you have any suggestion if I don't setup the session properties? (In fact, I tried many session properties, did not resolve the situation that I met). Basically, my code is refer to the sample code from Solace, just separated to Producer and Consumer, and for load balancing purpose, I set access type as NonExclusive.
AccessType = EndpointProperties.EndpointAccessType.NonExclusive
0 -
You'll need to make sure your Flow (not session) ackmode property is set to client:
AckMode = MessageAckMode.ClientAck
But that really should be all you need to do.Note that there is some batching of acknowledgements in the API, so you may see a small delay before all messages show as deleted from the queue. If you like, you could increase the API logging level and see when the API sends the acknowledgements.
0 -
Hi @TomF, yes, the Ackmode is ClinetAck from the beginning
Flow = session.CreateFlow(new FlowProperties()
{ AckMode = MessageAckMode.ClientAck }, queue, null, HandleMessageEvent, HandleFlowEvent); Flow.Start();
And I am sure that the queue data did not be deleted once the consumer late to ack, so the broker sent data again and again.
Thanks for the suggestion, I will see how to implement it to the code.0 -
@alhung your comment about the broker re-sending the message is very interesting. The broker will only send the message again if the flow is destroyed, otherwise the message will stay on the queue pending acknowledgement. Is it possible you're destroying the flow before the acknowledgment is sent?
0 -
Hi @TomF, my code as below, since I am just trying to figure out how Solace work with Topic to Queue Mapping, so basically I follow the sample code (https://github.com/SolaceSamples/solace-samples-dotnet/blob/master/src/TopicToQueueMapping/TopicToQueueMapping.cs) to modify it to window form without big change,
and separated to producer and consumer. I even marked cleanup, so should not destroy the flow.//finally
//{ // // Dispose Solace Systems Messaging API // ContextFactory.Instance.Cleanup(); //} //SetText("Finished.");
//Code started from click button
private void btnSub_Click(object sender, EventArgs e)
{ Thread t1 = new Thread(Run); t1.Start(); }
// Initialize Solace Systems Messaging API with logging to console at Warning level
ContextFactoryProperties cfp = new ContextFactoryProperties()
{ SolClientLogLevel = SolLogLevel.Debug }; cfp.LogToConsoleError(); ContextFactory.Instance.Init(cfp); try { // Context must be created first using (IContext context = ContextFactory.Instance.CreateContext(new ContextProperties(), null)) { ... // Create session properties SessionProperties sessionProps = new SessionProperties() { Host = host, VPNName = vpnname, UserName = username, Password = password, ReconnectRetries = DefaultReconnectRetries, IgnoreDuplicateSubscriptionError = true }; sessionProps.GdWithWebTransport = true; sessionProps.AckEventMode = SessionProperties.AckMode.PER_MSG; sessionProps.ADPublishAckTimerInMsecs = 60000; // Connect to the Solace messaging router Console.WriteLine("Connecting as {0}@{1} on {2}...", username, vpnname, host); using (ISession session = context.CreateSession(sessionProps, null, null)) { returnCode = session.Connect(); if (returnCode == ReturnCode.SOLCLIENT_OK) { Console.WriteLine("Session successfully connected."); if (session.IsCapable(CapabilityType.PUB_GUARANTEED) && session.IsCapable(CapabilityType.SUB_FLOW_GUARANTEED) && session.IsCapable(CapabilityType.ENDPOINT_MANAGEMENT) && session.IsCapable(CapabilityType.QUEUE_SUBSCRIPTIONS)) { SetText("All required capabilities supported."); TestConsumer(session);
private void TestConsumer(ISession session)
{ // Provision the queue string queueName = "testqueue3"; // Create the queue using (IQueue queue = ContextFactory.Instance.CreateQueue(queueName)) { // Set queue permissions to "consume" and access-type to "exclusive" EndpointProperties endpointProps = new EndpointProperties() { Permission = EndpointProperties.EndpointPermission.Consume, AccessType = EndpointProperties.EndpointAccessType.NonExclusive, }; // Provision it, and do not fail if it already exists session.Provision(queue, endpointProps, ProvisionFlag.IgnoreErrorIfEndpointAlreadyExists | ProvisionFlag.WaitForConfirm, null); SetText("Queue " + queueName + " has been created and provisioned."); // Add subscription to the topic mapped to the queue ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg"); session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null); Flow = session.CreateFlow(new FlowProperties() { AckMode = MessageAckMode.ClientAck }, queue, null, HandleMessageEvent, HandleFlowEvent); Flow.Start(); // block the current thread until a confirmation received CountdownEvent.Wait(); } }
private void HandleMessageEvent(object source, MessageEventArgs args)
{ // Received a message Console.WriteLine("Received message."); using (IMessage message = args.Message) { SetText(Encoding.ASCII.GetString(message.BinaryAttachment)); // ACK the message Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text)); //args.Message.AckImmediately = true; Flow.Ack(message.ADMessageId); //CountdownEvent.Signal(); } }
public void HandleFlowEvent(object sender, FlowEventArgs args)
{ // Received a flow event SetText("Received Flow Event "+ args.Event + " Type: " + args.ResponseCode.ToString() + " Text: " + args.Info); }
0 -
@alhung thanks for the code. Firstly, when you receive a message the message data shouldn't be overwritten, but that's what you're doing with the line:
SetText(Encoding.ASCII.GetString(message.BinaryAttachment));
I'm not sure what you're trying to do here, but you're reading the message payload ("Binary Attachment') and then over-writing it.Secondly, it would be worth checking the return code from Flow.Ack() - are you getting SOLCLIENT_OK?
Lastly, have a look at https://docs.solace.com/API-Developer-Online-Ref-Documentation/net/html/3a13818a-ef45-d3ac-6e03-183f97b8822c.htm, particularly:
Consequently, applications must not block in callback/delegate routines.
Which, unfortunately, is what you're doing with:
Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text));
What I recommend you do is pass the received message to a data structure insideHandleMessageEvent()
. Then in your application thread, read the messages in your data structure, apply your delay and then acknowledge them.0 -
Hi @TomF, thanks for the replies.
The purpose of SetText is just to show the information on the window form. I did not overwrite the message, because based on the sample code, it uses the following code to display the message, I just use callback to let it show on the UI.
[Sample code]Console.WriteLine("Message content: {0}", Encoding.ASCII.GetString(message.BinaryAttachment));
[My code]
SetText(Encoding.ASCII.GetString(message.BinaryAttachment));
delegate void SetTextCallback(string text);
private void SetText(string text)
{ if (this.txtSubData.InvokeRequired) { SetTextCallback d = new SetTextCallback(SetText); this.Invoke(d, new object[] { text }); } else { string time = DateTime.Now.ToString(); this.txtSubData.Text += time + " " + text + "\r\n"; } }
0 -
Secondly, for Flow.Ack, my understanding is, it acks to the broker with messageid (looks it is sequence number), so that the broker (solace) confirm the consumer got the data and able to delete the data from queue. The ack message as below image. SOLCLIENT_OK only used on connection checking.
https://imgur.com/undefinedSo, after the consumber subscribed the queue with topic, it created flow and waiting for message.
// Add subscription to the topic mapped to the queue
ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg"); session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null);
Flow = session.CreateFlow(new FlowProperties()
{ AckMode = MessageAckMode.ClientAck }, queue, null, HandleMessageEvent, HandleFlowEvent); Flow.Start(); // block the current thread until a confirmation received CountdownEvent.Wait();
Thanks for the link of IContext Interface, so I should not directly set Thread sleep to simulate the consumer is busy, and late to ack. I may not really understand the concept of ContextFactory, need some time to survey it.
0 -
Hi @alhung, my misunderstanding of
SetText()
made me laugh - I'm so wrapped up in the messaging API I just assumed you were callingmessage.setText
!Many calls to the Solace API return a success of fail code. In the case of
Flow.ack(),
the documentation states that a return other thanSOLCLIENT_OK
is highly unlikely.I'd agree that understanding the idea of Context and Session is important. In essence, the Context is what drives the network (TCP socket) interface, so all network activity happens on the Context thread. The callbacks/delegates (in this case the message receive callback,
HandleMessageEvent
, is running in the Context thread. If you call sleep in your callback, the API can no longer send or receive data, including acknowledgements.1 -
Hi @TomF, thanks for the replies.
The purpose of SetText is just to show the information on the window form. I did not overwrite the message, because based on the sample code, it uses the following code to display the message, I just use callback to let it show on the UI.
[Sample code]Console.WriteLine("Message content: {0}", Encoding.ASCII.GetString(message.BinaryAttachment));
[My code]
SetText(Encoding.ASCII.GetString(message.BinaryAttachment));
delegate void SetTextCallback(string text);
private void SetText(string text)
{ if (this.txtSubData.InvokeRequired) { SetTextCallback d = new SetTextCallback(SetText); this.Invoke(d, new object[] { text }); } else { string time = DateTime.Now.ToString(); this.txtSubData.Text += time + " " + text + "\r\n"; } }
Secondly, for Flow.Ack, my understanding is, it acks to the broker with messageid (looks it is sequence number), so that the broker (solace) confirm the consumer got the data and able to delete the data from queue. The ack message as below image. SOLCLIENT_OK only used on connection checking.
https://imgur.com/undefinedSo, after the consumber subscribed the queue with topic, it created flow and waiting for message.
// Add subscription to the topic mapped to the queue
ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg"); session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null);
Flow = session.CreateFlow(new FlowProperties()
{ AckMode = MessageAckMode.ClientAck }, queue, null, HandleMessageEvent, HandleFlowEvent); Flow.Start(); // block the current thread until a confirmation received CountdownEvent.Wait();
Thanks for the link of IContext Interface, so I should not directly set Thread sleep to simulate the consumer is busy, and late to ack. I may not really understand the concept of ContextFactory, need some time to survey it.
0