C# - Solace Queue Message not reading and missing sometime

murusolai
murusolai Member Posts: 2
edited May 2023 in PubSub+ Event Broker #1

I have referred sample code provided by the Solace team for implemented Pub/Sub Queue message (https://github.com/SolaceSamples/solace-samples-dotnet)

Sample code is implemented part of Microsoft .Net 6 Worker service and scheduled to run every 30 mins. I am facing new issue and i really don't know what went wrong in my piece code. working completely inconsistent and i don't find any other reference or technical support. Following issues are occurring

Scenario: I have 10 different Queues for subscribe message and each queue will have at least 20 message available( larger json formatted) which mean 10 Queue X 20 Message = 200 Message to download(subscribe) every scheduled run.

Issue 1: All the Queue has a message(20 each) and downloaded all messages from 8 Queue and 2 other Queue incomplete in first run, if i rerun second time, all the pending message from those 2 queue also downloaded.

in simple, all messages are not downloading at one go.

Issue 2: its slightly different than 1st issue. All the Queue has a message(20 each) and downloaded all messages from 7 Queue and 3 other Queue incomplete. even if i rerun multiple time, pending message still remain in solace Queue and there is no error or log to figure out the issue.

To simplify, few messages permanently stuck in solace and unable to download forever.


Issue 3: few occasions, I have multiple message in particular Queue. Mostly all the message downloaded one go. However, sometime only one message getting download per scheduled run.

Which mean, every scheduled run only one message subscribed at a time and not all the message.

Issue 4: few occasions, Messages missing in solace. which mean, i didn't acknowledge the message however its missing from Queue.


Note: The code that is implemented in .Net Worker service, i slightly tweaked to Asynchronous call. Also, same piece code I implemented in AWS Lambda using C# and all above issue remain same.


QueueName1: HNK/MTC/PROPOSAL/PROPOSAL/PROPOSAL/3.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM

QueueName2: HNK/STP/PRODUCT/MATERIAL/FINISHED/3.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM

QueueName3: HNK/MTC/SALES/SALESORG/SALESORG/1.0.0/LMDM/UPD/APAC/VN/OND/CUSTOM



using System;
using System.Text;
using SolaceSystems.Solclient.Messaging;
using System.Threading;

namespace Lambda_Queue_Service
{
    public class SolaceSubscribeMessage : IDisposable
    {
        string VPNName { get; set; }
        string UserName { get; set; }
        string Password { get; set; }

        private bool disposedValue = false;
        private static string folderName = "data";

        private ISession Session = null;
        private IQueue Queue = null;
        private IFlow Flow = null;
        private EventWaitHandle WaitEventWaitHandle = new AutoResetEvent(false);


        public static async Task SubScribeMessage(string Host, string VPNName, string UserName, string Password, string queueName, string TemplateCode)
        {

            string host = Host;
            string username = UserName;
            string vpnname = VPNName;
            string password = Password;

            ContextFactoryProperties cfp = new ContextFactoryProperties()
            {
                SolClientLogLevel = SolLogLevel.Warning
            };
            cfp.LogToConsoleError();
            ContextFactory.Instance.Init(cfp);

            try
            {
                using (IContext context = ContextFactory.Instance.CreateContext(new ContextProperties(), null))
                {
                    using (SolaceSubscribeMessage queueConsumer = new SolaceSubscribeMessage()
                    {
                        VPNName = vpnname,
                        UserName = username,
                        Password = password
                    })
                    {
                        await queueConsumer.Run(context, host, queueName, TemplateCode);
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine("Exception thrown: {0} - {1}", ex.Message, TemplateCode);
            }
            finally
            {
                // Dispose Solace Systems Messaging API
                ContextFactory.Instance.Cleanup();
            }
        }


        async Task Run(IContext context, string host, string queueName, string TemplateCode)
        {

            folderName = TemplateCode;
            SessionProperties sessionProps = new SessionProperties()
            {
                Host = host,
                VPNName = VPNName,
                UserName = UserName,
                Password = Password,
                ReconnectRetries = 10,
                SSLValidateCertificate = false,
                SdkBufferSize = 8000000,
                SocketSendBufferSizeInBytes = 8000000,
                SocketReceiveBufferSizeInBytes = 8000000,
                ConnectTimeoutInMsecs = 300,
                ConnectRetries = 10

            };

            Session = context.CreateSession(sessionProps, HandleMessageEvent, null);
            ReturnCode returnCode = Session.Connect();
            if (returnCode == ReturnCode.SOLCLIENT_OK)
            {
                EndpointProperties endpointProps = new EndpointProperties()
                {
                    Permission = EndpointProperties.EndpointPermission.Consume,
                    AccessType = EndpointProperties.EndpointAccessType.Exclusive
                };

                Queue = ContextFactory.Instance.CreateQueue(queueName);

                Session.Provision(Queue, endpointProps,
                    ProvisionFlag.IgnoreErrorIfEndpointAlreadyExists | ProvisionFlag.WaitForConfirm, null);

                Flow = Session.CreateFlow(new FlowProperties()
                {
                    AckMode = MessageAckMode.ClientAck
                },
                Queue, null, HandleMessageEvent, HandleFlowEvent);
                Flow.Start();
                WaitEventWaitHandle.WaitOne(5000);
            }
            else
            {
                Console.WriteLine("Error connecting, return code: {0}", returnCode);
            }
        }
        private async void HandleMessageEvent(object source, MessageEventArgs args)
        {
            using (IMessage message = args.Message)
            {
                Console.WriteLine(folderName + "- Message - " + DateTime.Now.ToString());
                string responseMsg = Encoding.ASCII.GetString(message.BinaryAttachment);
                Guid guid = Guid.NewGuid();
                AmazonS3Utility s3Utility = new AmazonS3Utility();
                await s3Utility.UploadFileAsync(responseMsg, string.Format("{0}_data.json", guid), folderName); // Upload the message to AWS S3
                Flow.Ack(message.ADMessageId);
                WaitEventWaitHandle.Set();
            }
        }
        public void HandleFlowEvent(object sender, FlowEventArgs args)
        {
        }


        protected virtual void Dispose(bool disposing)
        {
            if (!disposedValue)
            {
                if (disposing)
                {
                    if (Session != null)
                    {
                        Session.Dispose();
                        Session = null;
                    }
                    if (Queue != null)
                    {
                        Queue.Dispose();
                        Queue = null;
                    }
                    if (Flow != null)
                    {
                        Flow.Dispose();
                        Flow = null;
                    }
                }
                disposedValue = true;
            }
        }

        public void Dispose()
        {
            Dispose(true);
        }

    }

}


Answers

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee

    Greetings @murusolai and welcome to the community!

    I am not sure what may be causing all of the issues you describe, but I do identify one issue in the provided code that will cause the first one. I observe that the .Run(...) method is blocking on WaitEventWaitHandle.WaitOne(5000) - which has two meanings:

    • The code will exit as soon as soon as WaitEventWaitHandle.Set() is called at least once
    • The code will exit after a maximum timeout of 5 seconds, regardless of whether all the messages have been processed yet or not

    This would explain issue 1: messages begin coming down the flow, are asynchronously processed, but once the first message has completed upload the wait handle is set and the function is executing before processing all the messages in the queue.

    If the desired behavior is to process all messages in a given queue, and then shutdown if no messages are received within a 5 second hold-down time, please see the following modifications you can make to your code:

    Add two additional fields to your SolaceSubscribeMessage class:

    private const int MessageReceiveTimeout = 5000;
    private Timer MessageReceiveTimer;
    

    Then add the following two methods to the same class:

    private void ResetMessageReceiveTimer()
    {
        MessageReceiveTimer.Change(MessageReceiveTimeout, Timeout.Infinite);
    }
    
    
    private void OnMessageReceiveTimeout(object state)
    {
        WaitEventWaitHandle.Set();
    }
    

    And finally, to leverage this code together, you need to modify the Run(...) method itself as follows:

    // ... other initialization code goes above here...
    
    Flow = Session.CreateFlow(new FlowProperties()
    {
        AckMode = MessageAckMode.ClientAck
    },
    Queue, null, HandleMessageEvent, HandleFlowEvent);
    Flow.Start();
    
    MessageReceiveTimer = new Timer(OnMessageReceiveTimeout);
    ResetMessageReceiveTimer();
    WaitEventWaitHandle.WaitOne();
    

    With this implementation, the Run(...) method has a timer that will fire off after 5 seconds. Every time a message is received the timer is reset, and after no more messages come down the flow, the time will expire and signal the wait handle that processing is complete.

    As-written, this construct is per-flow, but could be extended to all flows within a single process by declaring the Timer and ManualResetEvent fields static and making appropriate adjustments.

    Please try out these changes and let us know what other issues are still observed.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin
    edited May 2023 #3

    Hi @murusolai, welcome to the Community!

    I'm not a .NET expert like @nicholasdgoodman, but I know a few things about our API that I'd just like to point out / mention...

    First, I'm wondering why you are changing some of the default API properties, specifically for:

    SdkBufferSize = 8000000,
    SocketSendBufferSizeInBytes = 8000000,
    SocketReceiveBufferSizeInBytes = 8000000,
    ConnectTimeoutInMsecs = 300,
    

    Are you just experimenting with different values to see if they help? Note: 300ms is VERY aggressive for a connection timeout setting (default 3000ms).

    Next, in your message handler, you are doing:

    Console.WriteLine(folderName + "- Message - " + DateTime.Now.ToString());
    string responseMsg = Encoding.ASCII.GetString(message.BinaryAttachment);
    Guid guid = Guid.NewGuid();
    AmazonS3Utility s3Utility = new AmazonS3Utility();
    await s3Utility.UploadFileAsync(responseMsg, string.Format("{0}_data.json", guid), folderName); // Upload the message to AWS S3
    Flow.Ack(message.ADMessageId);
    WaitEventWaitHandle.Set();
    

    Now, in C and C# APIs, the thread that runs the "on message" event handler is an API thread, the Context thread, not an application-owned thread. And this is the only thread the API has for performing all other API-related functions (e.g. reading the socket, running timers, etc.). It is very strongly discouraged from doing any significant amount of work in your callback for this reason. And in your case, you are trying to do a blocking send to an S3 cloud service. This can have significant issues on the API, including forcing a disconnect due to keepalive timers expiring due to the Context thread being blocked for too long.

    There is more information here: https://docs.solace.com/API/API-Developer-Guide/Creating-Contexts.htm#initializing_and_monitoring_apis_2715568632_602452 and here: https://docs.solace.com/API/API-Developer-Guide/API-Threading.htm

    Ideally, your callback will simply take the IMessage to be processed, stick it into some internal processing queue/linked list/ring buffer and return. And you'd have other application-owned threads that are blocking on that data structure, pulling messages out, processing them to S3, and then ACKing and Dispose()ing them when complete.

    If the broker is sending you more data than your app can take in, there is always the IFlow.Start() and Stop() methods to temporarily pause message delivery.

    Hope that helps!? 👍🏼

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin
    edited May 2023 #4

    BTW, the fact that you're creating a new Context for each Session (connection) and each Flow means that you partially avoided some very bad behaviour. Because otherwise each of your Sessions would be sharing the same Context, and therefore the same thread. And so all 10 of your queues' Flows' callbacks would be on the same thread. So posting to S3 would be completely serialized.

    For consideration, if you wanted to build this all in one app, here's how I might do it. I'll explain using Java since it's similar enough for this explanation.

    One Context, One Session (connection). Multiple IFlows, each bound to their particular queue. The callback simply takes the message received and places it in a BlockingQueue / BlockingCollection and returns, does not ACK. Callback == nice and fast now. Then you can have multiple worker threads bound to this BlockingCollection object and POSTing to S3. Something similar to this:https://www.youtube.com/watch?v=5ilYW2VqTqQ

    If you are concerned about ordering, then you could have 10 BlockingCollections, one for each queue, and one worker thread processing from each. This ensures messages are stuck into S3 in correct order... if that matters to you.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin

    Haha one more final thought! You don't even need to use .NET or a microservice to post data in S3. Using our REST integration capabilities (Rest Delivery Points), you can have the broker POST/PUT messages directly from your queue into S3. I made a video showing exactly this: https://www.youtube.com/watch?v=INMrSL2fA3E

    Troubleshooting RDPs is sometimes a bit tricky. I made this utility to help grab RDP error logs out of the broker, in case you're having some issues: https://sg.solace.com/aaron/rdp/

  • murusolai
    murusolai Member Posts: 2
    edited May 2023 #6

    @nicholasdgoodman Thanks for your valuable input, i changed a code as per your suggestion and still facing issue. All the message from one particular Queue get stuck and its not getting download.

    "HandleMessageEvent" - this event is not triggering at all.

    I tried 2 approach,

    Approach 1. Uploaded 20 message in 3 different Queue(Each) and run the lambda service. All the message got downloaded from 2 Queue and 1 Queue's message fully stuck.

    Approach 2. Uploaded 20 message to the only one particular Queue which got stuck in previous approach. None of the message getting download.

    So, specific one Queue message not downloading entirely. Json size is very smaller but still not reading the message. Please find an updated code below, i am completely fed up with Solace Queue. Request your support to resolve the issue.

    Note: Sample Json file is attached in original post, same data i am trying to download from solace. you may use for the investigation if required.


    using System.Text;
    using SolaceSystems.Solclient.Messaging;
    
    namespace Lambda_Queue_Service
    {
        public class SolaceSubscribeMessage : IDisposable
        {
            string VPNName { get; set; }
            string UserName { get; set; }
            string Password { get; set; }
    
            private bool disposedValue = false;
            private static string folderName = "data";
    
            private ISession Session = null;
            private IQueue Queue = null;
            private IFlow Flow = null;
            private EventWaitHandle WaitEventWaitHandle = new AutoResetEvent(false);
            private const int MessageReceiveTimeout = 5000;
            private Timer MessageReceiveTimer;
    
            public SolaceSubscribeMessage()
            {
    
            }
            public void SubScribeMessage(string Host, string VPNName, string UserName, string Password, string queueName, string TemplateCode)
            {
                string host = Host;
                string username = UserName;
                string vpnname = VPNName;
                string password = Password;
    
                ContextFactoryProperties cfp = new ContextFactoryProperties()
                {
                    SolClientLogLevel = SolLogLevel.Warning
                };
                cfp.LogToConsoleError();
                ContextFactory.Instance.Init(cfp);
    
                try
                {
                    using (IContext context = ContextFactory.Instance.CreateContext(new ContextProperties(), null))
                    {
                        using (SolaceSubscribeMessage queueConsumer = new SolaceSubscribeMessage()
                        {
                            VPNName = vpnname,
                            UserName = username,
                            Password = password
                        })
                        {
                            queueConsumer.Run(context, host, queueName, TemplateCode);
                        }
                    }
                }
                catch (Exception ex)
                {
                    Console.WriteLine("Exception thrown: {0} - {1}", ex.Message, TemplateCode);
                }
                finally
                {
                    ContextFactory.Instance.Cleanup();
                }
            }
            void Run(IContext context, string host, string queueName, string TemplateCode)
            {
    
    
                folderName = TemplateCode;
                SessionProperties sessionProps = new SessionProperties()
                {
                    Host = host,
                    VPNName = VPNName,
                    UserName = UserName,
                    Password = Password,
                    ReconnectRetries = 3,
                    SSLValidateCertificate = false
                };
    
                Session = context.CreateSession(sessionProps, HandleMessageEvent, null);
                ReturnCode returnCode = Session.Connect();
                if (returnCode == ReturnCode.SOLCLIENT_OK)
                {
                    EndpointProperties endpointProps = new EndpointProperties()
                    {
                        Permission = EndpointProperties.EndpointPermission.Consume,
                        AccessType = EndpointProperties.EndpointAccessType.Exclusive
                    };
    
                    Queue = ContextFactory.Instance.CreateQueue(queueName);
    
                    Session.Provision(Queue, endpointProps,
                        ProvisionFlag.IgnoreErrorIfEndpointAlreadyExists | ProvisionFlag.WaitForConfirm, null);
    
                    Flow = Session.CreateFlow(new FlowProperties()
                    {
                        AckMode = MessageAckMode.ClientAck
                    },
                    Queue, null, HandleMessageEvent, HandleFlowEvent);
                    Flow.Start();
                    MessageReceiveTimer = new Timer(OnMessageReceiveTimeout);
                    ResetMessageReceiveTimer();
                    WaitEventWaitHandle.WaitOne();
                }
                else
                {
                    Console.WriteLine("Error connecting, return code: {0}", returnCode);
                }
            }
            private  void HandleMessageEvent(object source, MessageEventArgs args)
            {
                using (IMessage message = args.Message)
                {
                    Console.WriteLine(folderName + "- Message - " + DateTime.Now.ToString());
                    string responseMsg = Encoding.ASCII.GetString(message.BinaryAttachment);
                    Guid guid = Guid.NewGuid();
                    AmazonS3Utility s3Utility = new AmazonS3Utility();
                     s3Utility.UploadFileAsync(responseMsg, string.Format("{0}_data.json", guid), folderName);
                    Flow.Ack(message.ADMessageId);
                    WaitEventWaitHandle.Set();
                }
            }
            
            public void HandleFlowEvent(object sender, FlowEventArgs args)
            {
            }
            private void ResetMessageReceiveTimer()
            {
                MessageReceiveTimer.Change(MessageReceiveTimeout, Timeout.Infinite);
            }
            private void OnMessageReceiveTimeout(object state)
            {
                WaitEventWaitHandle.Set();
            }
    
            protected virtual void Dispose(bool disposing)
            {
                if (!disposedValue)
                {
                    if (disposing)
                    {
                        if (Session != null)
                        {
                            Session.Dispose();
                            Session = null;
                        }
                        if (Queue != null)
                        {
                            Queue.Dispose();
                            Queue = null;
                        }
                        if (Flow != null)
                        {
                            Flow.Dispose();
                            Flow = null;
                        }
                    }
                    disposedValue = true;
                }
            }
            public void Dispose()
            {
                Dispose(true);
            }
        }
    }
    
    


  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin

    Hi @murusolai ... you haven't implemented a number of changes we suggested:

    • You're still making a blocking call to S3 in the message event handler.
    • You're saying that HandleMessageEvent is not getting called at all? But you also say that 2 of 3 queues get emptied?
    • Please provide API documentation for AmazonS3Utility and UploadFileAsync. I can't find it publicly.
    • Please add some logging to the HandleFlowEvent callback as well, rather than just silently ignore.
    • Please show us your console output at least.
    • More importantly: please set your API logging to DEBUG and provide a full dump of API logs. See: https://docs.solace.com/API/API-Developer-Guide/Configuring-Logging.htm#C-and-Net-api
  • Ashwinikumar
    Ashwinikumar Member Posts: 1

    Hi AAron ,what will be the effect of giving "HandleMessageEvent" both in session and flow, as he given in above code.

    Session = context.CreateSession(sessionProps, HandleMessageEvent, null);
    Flow = Session.CreateFlow(new FlowProperties()
    {
    AckMode = MessageAckMode.ClientAck
    },
    Queue, null, HandleMessageEvent, HandleFlowEvent);

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee

    @Ashwinikumar the effect of doing so is twofold:

    Session-level messages (such as those from topic-based subscriptions) will go to the general session message event handler if the subscription was made without specifying an IDispatchTarget or TargetDispatch session option was set to false.

    The flow-level message event handler will fire whenever that specific source queue receives a message; and messages from flows will not trigger the session-level message handler.

    There are no side effects of using the same handler for both; however, the code would need to differentiate where the message came from. This could be done by casting the Object sender argument to either an IFlow or ISession instance, and retrieving the message source accordingly.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin

    Well, either the message source or the message's destination? I.e. what topic was the message published to. But yeah, usually better/easier to handle messages in different callbacks as you know they're coming from different sources. (default session-level callback for Direct messages, or a per-queue callback).

    Note that all of these callbacks are fired on the same API-owned Context thread. In C#, you cannot block that thread for any processing (unless super super fast), and messages should be handed off to an application-owned thread for further processing.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    To provide some more C# specific context around writing a good message event handler:

    In the HandleMessageEvent callback, just write the message to some kind of collection. I strongly recommend a System.Threading.Channel, using the CreateUnbounded factory. Then have a separate task which loops (forever - until cancelled via a cancellation token if required) over the channel output via its IAsyncEnumerable - ie, await foreach (var msg in myUnboundedChannel.ReadAllAsync())