How to setup the ack timer for guarantee messaging

alhung
alhung Member Posts: 14

Hi All,
I am trying to implement the Solace exchange pattern called Topic to Queue Mapping (https://solace.com/samples/solace-samples-dotnet/topic-to-queue-mapping/), the architecture is 1 producer, 1 queue with 2 consumer (access type is non-exclusive).
Considering the loading issue, I assume the consumer will not ack immediately, so added thread.sleep before ack. Then I discovered when I set the sleep time to 10 seconds, Solace did not delete the data from queue and re-transmit the data again and again.
I think perhaps there is ack timer settings in Solace, so I changed the properties, like this sessionProps.ADPublishAckTimerInMsecs = 60000, also extend the window size (sessionProps.ADPublishWindowSize = 500), but it seems not working. Besides, from the Solace broker UI, could not find similar settings. (SessionProperties.AckMode.PER_MSG and SessionProperties.AckMode.WINDOWED also tested)
I wondering if anyone ever tested the same scenario, and figure out how to adjust the ack timer from Solace? Or How Solace calculate the Ack time?
Looking forward to your response, thank you.

Tagged:

Comments

  • alhung
    alhung Member Posts: 14
    edited December 2020 #2

    The code is based on the sample code provided by Solace, but changed to window form. As the screen shot shows, once I set the Ack delay time to 10 seconds, the message in queue did not be deleted and the broker continuously sending data.

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    Hi @alhung, Solace acknowledgement handling is highly tuned and it's usually best not to change the settings you were accessing. The acknowledgement scheme is actually pretty simple from a client point of view: only acknowledge the message once you're sure you've finished with it.

    So, with your topic to queue mapping example, I recommend keeping it simple. Don't use the SessionProperties you mention, leave them at the default. Then when you create your flow, set the ackmode to client:

     Flow = Session.CreateFlow(new FlowProperties()
                    {
                        AckMode = MessageAckMode.ClientAck
                    },
    ...
    

    This stops the API sending aknowledgements automatically so you can send them when you need to by calling Flow.Ack:

      private void HandleMessageEvent(object source, MessageEventArgs args)
            {
                using (IMessage message = args.Message)
                {
                    // Process the message.  You might want to put your delay here.
                    // ACK the message
                    Flow.Ack(message.ADMessageId);
                   ...
                }
            }
    

    See the SolaceSamples GitHub for more details, specifically the QueueConsumer.

    The parameters you were accessing are low level API details of how acknowledgements are sent back to the broker, and are there primarily for performance tuning. Let's get the basics working first :smiley:

  • alhung
    alhung Member Posts: 14

    Hi @TomF, thanks for your replies. Please allow me to describe my question again with code.

    When received the message, I set the thread.sleep before ack, because I want to simulate the consumer already did some tasks (e.g. parse data, insert into database and so on), means the consumer will not ack to broker immediately.
    https://imgur.com/ziZzWmM

    private void HandleMessageEvent(object source, MessageEventArgs args)

        {
            using (IMessage message = args.Message)
            {
                // Expecting the message content as a binary attachment
                ...
                //set ack delay
                Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text));
                // ACK the message
                Flow.Ack(message.ADMessageId);
    

    Based on the sample code of producer, it sends different data by loop, so the consumer will late to ack for each data.

    for (int i = 0; i < TotalMessages; i++)

                {
                    // Create the message content as a binary attachment
                    message.BinaryAttachment = Encoding.ASCII.GetBytes(
                        string.Format("Topic to Queue Mapping Tutorial! Message ID: #{0} data{1}", idx, i));
    
                    // Send the message to the queue on the Solace messaging router
                    SetText("Sending message ID " + i + " to topic " + tutorialTopic.Name + " mapped to queue " + queueName + "...");
    
                    ReturnCode returnCode = session.Send(message);
    

    Then I set the ack delay time as 7 seconds, you can see from the following image, the queue data could not be deleted and the consumer received data continuously. That why I would like to adjust the ack timer, because I was guessing that the broker thought the ack was late, then consumer may not successfully received the data.

    https://imgur.com/oWEdgAk

    Do you have any suggestion if I don't setup the session properties? (In fact, I tried many session properties, did not resolve the situation that I met). Basically, my code is refer to the sample code from Solace, just separated to Producer and Consumer, and for load balancing purpose, I set access type as NonExclusive.

    AccessType = EndpointProperties.EndpointAccessType.NonExclusive

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    You'll need to make sure your Flow (not session) ackmode property is set to client:
    AckMode = MessageAckMode.ClientAck
    But that really should be all you need to do.

    Note that there is some batching of acknowledgements in the API, so you may see a small delay before all messages show as deleted from the queue. If you like, you could increase the API logging level and see when the API sends the acknowledgements.

  • alhung
    alhung Member Posts: 14
    edited December 2020 #7

    Hi @TomF, yes, the Ackmode is ClinetAck from the beginning

    Flow = session.CreateFlow(new FlowProperties()

                {
                    AckMode = MessageAckMode.ClientAck
                },
                queue, null, HandleMessageEvent, HandleFlowEvent);
                Flow.Start();
    

    And I am sure that the queue data did not be deleted once the consumer late to ack, so the broker sent data again and again.
    Thanks for the suggestion, I will see how to implement it to the code.

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    @alhung your comment about the broker re-sending the message is very interesting. The broker will only send the message again if the flow is destroyed, otherwise the message will stay on the queue pending acknowledgement. Is it possible you're destroying the flow before the acknowledgment is sent?

  • alhung
    alhung Member Posts: 14

    Hi @TomF, my code as below, since I am just trying to figure out how Solace work with Topic to Queue Mapping, so basically I follow the sample code (https://github.com/SolaceSamples/solace-samples-dotnet/blob/master/src/TopicToQueueMapping/TopicToQueueMapping.cs) to modify it to window form without big change,
    and separated to producer and consumer. I even marked cleanup, so should not destroy the flow.

    //finally

            //{
            //    // Dispose Solace Systems Messaging API
            //    ContextFactory.Instance.Cleanup();
            //}
            //SetText("Finished.");
    

    //Code started from click button

    private void btnSub_Click(object sender, EventArgs e)

        {
            Thread t1 = new Thread(Run);
            t1.Start();
        }
    

    // Initialize Solace Systems Messaging API with logging to console at Warning level

          ContextFactoryProperties cfp = new ContextFactoryProperties()
    
            {
                SolClientLogLevel = SolLogLevel.Debug
            };
            cfp.LogToConsoleError();
            ContextFactory.Instance.Init(cfp);
    
            try
            {
                // Context must be created first
                using (IContext context = ContextFactory.Instance.CreateContext(new ContextProperties(), null))
                {
                ...
                // Create session properties
                    SessionProperties sessionProps = new SessionProperties()
                    {
                        Host = host,
                        VPNName = vpnname,
                        UserName = username,
                        Password = password,
                        ReconnectRetries = DefaultReconnectRetries,
                        IgnoreDuplicateSubscriptionError = true
                    };
    
                    sessionProps.GdWithWebTransport = true;
                    sessionProps.AckEventMode = SessionProperties.AckMode.PER_MSG;
                    sessionProps.ADPublishAckTimerInMsecs = 60000;
    
                    // Connect to the Solace messaging router
                    Console.WriteLine("Connecting as {0}@{1} on {2}...", username, vpnname, host);
                    using (ISession session = context.CreateSession(sessionProps, null, null))
                    {
                        returnCode = session.Connect();
                        if (returnCode == ReturnCode.SOLCLIENT_OK)
                        {
                            Console.WriteLine("Session successfully connected.");
    
                            if (session.IsCapable(CapabilityType.PUB_GUARANTEED) &&
                                session.IsCapable(CapabilityType.SUB_FLOW_GUARANTEED) &&
                                session.IsCapable(CapabilityType.ENDPOINT_MANAGEMENT) &&
                                session.IsCapable(CapabilityType.QUEUE_SUBSCRIPTIONS))
                            {
                                SetText("All required capabilities supported.");
                                TestConsumer(session);
    

    private void TestConsumer(ISession session)

        {
            // Provision the queue
            string queueName = "testqueue3";
    
            // Create the queue
            using (IQueue queue = ContextFactory.Instance.CreateQueue(queueName))
            {
                // Set queue permissions to "consume" and access-type to "exclusive"
                EndpointProperties endpointProps = new EndpointProperties()
                {
                    Permission = EndpointProperties.EndpointPermission.Consume,
                    AccessType = EndpointProperties.EndpointAccessType.NonExclusive,
                };
                // Provision it, and do not fail if it already exists
                session.Provision(queue, endpointProps,
                    ProvisionFlag.IgnoreErrorIfEndpointAlreadyExists | ProvisionFlag.WaitForConfirm, null);
                SetText("Queue " + queueName + " has been created and provisioned.");
    
                // Add subscription to the topic mapped to the queue
                ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg");
                session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null);
    
                Flow = session.CreateFlow(new FlowProperties()
                {
                    AckMode = MessageAckMode.ClientAck
                },
                queue, null, HandleMessageEvent, HandleFlowEvent);
                Flow.Start();
    
                // block the current thread until a confirmation received
                CountdownEvent.Wait();
            }
        }
    

    private void HandleMessageEvent(object source, MessageEventArgs args)

        {
            // Received a message
            Console.WriteLine("Received message.");
            using (IMessage message = args.Message)
            {
                SetText(Encoding.ASCII.GetString(message.BinaryAttachment));
                // ACK the message
                Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text));
                //args.Message.AckImmediately = true;
                Flow.Ack(message.ADMessageId);
                //CountdownEvent.Signal();
            }
        }
    

    public void HandleFlowEvent(object sender, FlowEventArgs args)

        {
            // Received a flow event
            SetText("Received Flow Event "+ args.Event + " Type: " + args.ResponseCode.ToString() + " Text: " + args.Info);
        }       
    
  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    @alhung thanks for the code. Firstly, when you receive a message the message data shouldn't be overwritten, but that's what you're doing with the line:
    SetText(Encoding.ASCII.GetString(message.BinaryAttachment));
    I'm not sure what you're trying to do here, but you're reading the message payload ("Binary Attachment') and then over-writing it.

    Secondly, it would be worth checking the return code from Flow.Ack() - are you getting SOLCLIENT_OK?

    Lastly, have a look at https://docs.solace.com/API-Developer-Online-Ref-Documentation/net/html/3a13818a-ef45-d3ac-6e03-183f97b8822c.htm, particularly:

    Consequently, applications must not block in callback/delegate routines.

    Which, unfortunately, is what you're doing with:
    Thread.Sleep(Convert.ToInt32(this.txtAckdelay.Text));
    What I recommend you do is pass the received message to a data structure inside HandleMessageEvent(). Then in your application thread, read the messages in your data structure, apply your delay and then acknowledge them.

  • alhung
    alhung Member Posts: 14

    Hi @TomF, thanks for the replies.
    The purpose of SetText is just to show the information on the window form. I did not overwrite the message, because based on the sample code, it uses the following code to display the message, I just use callback to let it show on the UI.
    [Sample code]

    Console.WriteLine("Message content: {0}", Encoding.ASCII.GetString(message.BinaryAttachment));

    [My code]

    SetText(Encoding.ASCII.GetString(message.BinaryAttachment));

    delegate void SetTextCallback(string text);

    private void SetText(string text)

        {
            if (this.txtSubData.InvokeRequired)
            {
                SetTextCallback d = new SetTextCallback(SetText);
                this.Invoke(d, new object[] { text });
            }
            else
            {
                string time = DateTime.Now.ToString();
                this.txtSubData.Text += time + " " + text + "\r\n";
            }
        }
    
  • alhung
    alhung Member Posts: 14

    Secondly, for Flow.Ack, my understanding is, it acks to the broker with messageid (looks it is sequence number), so that the broker (solace) confirm the consumer got the data and able to delete the data from queue. The ack message as below image. SOLCLIENT_OK only used on connection checking.
    https://imgur.com/undefined

    So, after the consumber subscribed the queue with topic, it created flow and waiting for message.

    // Add subscription to the topic mapped to the queue

                ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg");
                session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null);
    

    Flow = session.CreateFlow(new FlowProperties()

                {
                    AckMode = MessageAckMode.ClientAck
                },
                queue, null, HandleMessageEvent, HandleFlowEvent);
                Flow.Start();
    
                // block the current thread until a confirmation received
                CountdownEvent.Wait();
    

    Thanks for the link of IContext Interface, so I should not directly set Thread sleep to simulate the consumer is busy, and late to ack. I may not really understand the concept of ContextFactory, need some time to survey it.

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    Hi @alhung, my misunderstanding of SetText()made me laugh - I'm so wrapped up in the messaging API I just assumed you were calling message.setText!

    Many calls to the Solace API return a success of fail code. In the case of Flow.ack(), the documentation states that a return other than SOLCLIENT_OK is highly unlikely.

    I'd agree that understanding the idea of Context and Session is important. In essence, the Context is what drives the network (TCP socket) interface, so all network activity happens on the Context thread. The callbacks/delegates (in this case the message receive callback, HandleMessageEvent, is running in the Context thread. If you call sleep in your callback, the API can no longer send or receive data, including acknowledgements.

  • alhung
    alhung Member Posts: 14

    Hi @TomF, thanks for the replies.
    The purpose of SetText is just to show the information on the window form. I did not overwrite the message, because based on the sample code, it uses the following code to display the message, I just use callback to let it show on the UI.
    [Sample code]

    Console.WriteLine("Message content: {0}", Encoding.ASCII.GetString(message.BinaryAttachment));

    [My code]

    SetText(Encoding.ASCII.GetString(message.BinaryAttachment));

    delegate void SetTextCallback(string text);

    private void SetText(string text)

        {
            if (this.txtSubData.InvokeRequired)
            {
                SetTextCallback d = new SetTextCallback(SetText);
                this.Invoke(d, new object[] { text });
            }
            else
            {
                string time = DateTime.Now.ToString();
                this.txtSubData.Text += time + " " + text + "\r\n";
            }
        }
    

    Secondly, for Flow.Ack, my understanding is, it acks to the broker with messageid (looks it is sequence number), so that the broker (solace) confirm the consumer got the data and able to delete the data from queue. The ack message as below image. SOLCLIENT_OK only used on connection checking.
    https://imgur.com/undefined

    So, after the consumber subscribed the queue with topic, it created flow and waiting for message.

    // Add subscription to the topic mapped to the queue

                ITopic tutorialTopic = ContextFactory.Instance.CreateTopic("T/mapped/topic/sampleMsg");
                session.Subscribe(queue, tutorialTopic, SubscribeFlag.WaitForConfirm, null);
    

    Flow = session.CreateFlow(new FlowProperties()

                {
                    AckMode = MessageAckMode.ClientAck
                },
                queue, null, HandleMessageEvent, HandleFlowEvent);
                Flow.Start();
    
                // block the current thread until a confirmation received
                CountdownEvent.Wait();
    

    Thanks for the link of IContext Interface, so I should not directly set Thread sleep to simulate the consumer is busy, and late to ack. I may not really understand the concept of ContextFactory, need some time to survey it.