Why is the flow in the HandleMessageEvent sometimes null?

Options
Jordi
Jordi Unconfirmed, Member Posts: 11

Hi,

we implemented a Solace consumer using the .NET API following the guidelines (https://tutorials.solace.dev/dotnet/persistence-with-queues/).

When runnung some tests we figured that sometimes messages were not properly acknowledged and removed from the queue.

When debugging the HandleMessageEvent we found that apparently, sometimes, when calling

Flow.Ack(message.ADMessageId)

Flow was null, especially, when we created several instances of the Consumer. This also happens when we properly Dispose the Consumer before creating a new:

public void Dispose()
    {
      Flow.Stop();
      Flow.Dispose();
      Session.Disconnect();
      Session.Dispose();
      Context.Dispose();
      ContextFactory.Instance.Cleanup();
    }

In the flow properties we only set

AckMode = MessageAckMode.ClientAck

Any idea, what might be the problem here?

Thanks, Jordi

Best Answer

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    #2 Answer ✓
    Options

    Hi @Jordi, ah, I see! The exception is thrown because the flow is null.

    Back to @Aaron's comment, your .dispose() is rather brutal. Not only are you closing the flow, you are also disconnecting (and disposing of) the session. Unless you have very good performance reasons, use the same session for all your flows: opening a session for every flow like this is very expensive. What's more, you're then disposing of the context - again, this is uncessary.

    This is speculation, but what might be happening is some kind of race condition between consumer1 being closed, consumer 2 starting, then the underlying session or context being disposed.

    I recommend trying the 1 session, 1 context approach, and seeing if that fixes your problem.

Answers

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @Jordi,

    Are you seeing any FlowEvents? Specifically, what are you doing in the HandleFlowEvent code? In the sample, it prints "Received Flow Event..." - are you seeing anything printed out like that? It could be that the flow has been disposed: you should see events, for instance that the flow is being closed.

    Do you check the flows are created properly? For instance, are you checking createFlow returns non-null?

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 531 admin
    Options

    I'd also recommend not creating multiple Consumers, or re-creating them. Just create one-per-Session when first initializing your program and that's it.

  • Jordi
    Jordi Unconfirmed, Member Posts: 11
    Options

    Hi TomF and Aron,

    thanks for your comments. Yes, I check FlowEvents and flows seems to be properly created. I also create one flow per session. Here is what I do:

    The consumer looks the following:

     public SolaceConsumer(int id, string selectorName = null, string selectorValue = null)
        {
          _id = id;´// for debugging
          Console.WriteLine($"Starting consumer {_id}");
    
          ContextFactoryProperties cfp = new ContextFactoryProperties()
                          {
                            SolClientLogLevel = SolLogLevel.Warning
                          };
          cfp.LogToConsoleError();
          ContextFactory.Instance.Init(cfp);
    
    
          X509Store store = new X509Store(StoreName.Root, StoreLocation.LocalMachine);
          store.Open(OpenFlags.OpenExistingOnly | OpenFlags.ReadOnly);
          X509Certificate2Collection certs = store.Certificates;
          store.Close();
    
          SessionProperties sessionProps = new SessionProperties()
                           {
                             Host = Resources.JNDI_PROVIDER_URL,
                             VPNName = Resources.SOLACE_JMS_VPN,
                             ReapplySubscriptions = true,
                             UserName = null,
                             ReconnectRetries = 5,
                             SSLValidateCertificate = true,
                             SSLValidateCertificateDate = true,
                             AuthenticationScheme = AuthenticationSchemes.CLIENT_CERTIFICATE,
                             SSLClientCertificateFile = Resources.CERTIFICATE_FILE,
                             SSLClientPrivateKeyFile = Resources.CERTIFICATE_KEY,
                             SSLTrustStore = certs
          };
    
          _context = ContextFactory.Instance.CreateContext(new ContextProperties(), null);
          _session = _context.CreateSession(sessionProps, MessageEventHandler, null);
          IQueue queue = ContextFactory.Instance.CreateQueue(Resources.QUEUE);
    
          FlowProperties flowProperties = new FlowProperties()
                          {
                            AckMode = MessageAckMode.ClientAck
                          };
    
          if(!(selectorName is null || selectorValue is null)) flowProperties.Selector = $"{selectorName} = '{selectorValue}'";
    
          ReturnCode returnCode = _session.Connect();
          if (returnCode == ReturnCode.SOLCLIENT_OK)
          {
            try
            {
              _flow = _session.CreateFlow(flowProperties, queue, null, MessageEventHandler, HandleFlowEvent);
            }
            catch (Exception e)
            {
              Console.WriteLine(e.Message);
            }
          }
        }
    
        public void Dispose()
        {
          Console.WriteLine($"Disposing consumer {_id}");
          _flow.Stop();
          _flow.Dispose();
          _session.Disconnect();
          _session.Dispose();
          _context.Dispose();
          ContextFactory.Instance.Cleanup();
        }
    
        private void HandleFlowEvent(object sender, FlowEventArgs args)
        {
           Console.WriteLine($"Consumer {_id}: {args.Event.ToString()} {args.Info}");
        }
    
        private void MessageEventHandler(object source, MessageEventArgs args)
        {
          IMessage msg = args.Message;
          string content = Encoding.ASCII.GetString(msg.BinaryAttachment ?? msg.XmlContent);
          ReceivedMessages.Add(new KeyValuePair<string, IDestination>(content, msg.Destination));
          if (true)
          {
            if (_flow is null)
            {
              Console.WriteLine($"Flow {_id} is null");
            }
            ReturnCode returnCode = _flow.Ack(msg.ADMessageId);
            if (returnCode == ReturnCode.SOLCLIENT_OK) msg.Dispose();
          }
        }
      }
    
    

    Then I test, e.g., that the selector works:

          // write two messages to the queue first, one with and one without selector
          // create a selective listener
          SolaceConsumer selectiveConsumer = new SolaceConsumer(1, selectorName, selectorValue);
          // wait to receive and acknowledge message
          await Task.Delay(TimeSpan.FromSeconds(2));
    
          // the selective listener picks up only one message
          Assert.AreEqual(1, selectiveConsumer.ReceivedMessages.Count);
          // close selective listener
          selectiveConsumer.Dispose();
    
          // write a message with selector
          writer.WriteMessage("Message 3", selectorName, selectorValue);
    
          // create a non-selective listener
          SolaceConsumer nonSelectiveConsumer = new SolaceConsumer(2);
          // wait to receive and acknowledge message
          await Task.Delay(TimeSpan.FromSeconds(2));
          // the non-selective listener picks up all messages
          Assert.AreEqual(2, nonSelectiveConsumer.ReceivedMessages.Count);
          nonSelectiveConsumer.Dispose();
    

    What I get as output on the console is, e.g.:

    Starting consumer 1
    Consumer 1: UpNotice  OK
    Disposing consumer 1
    Consumer 1: ParentSessionDown  Session for Flow disconnected
    Starting consumer 2
    Consumer 2: UpNotice  OK
    Flow 2 is null
    Disposing consumer 2
    Consumer 2: ParentSessionDown  Session for Flow disconnected
    

    For consumer 2 the flow becomes null and, therefore, the messages are not properly acknowledged. Sometimes the flow becomes null for the first consumer. Seems random.

    I also get a warning:

        Warning - FlowImpl: 09:10:18	|	Client Application threw an exception in flow rx message callback	|	System.NullReferenceException: Object reference not set to an instance of an object.
       at SolaceConsumer.MessageEventHandler(Object source, MessageEventArgs args) in C:\...\SolaceConsumer.cs:line 139
       at SolaceSystems.Solclient.Messaging.Native.FlowImpl.Flow_RxCallbackFuncV2(IntPtr opaqueFlow, IntPtr opaqueMessage, IntPtr user)
    

    So, any idea, why the flow becomes null?

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @Jordi, I strongly suspect that your exception is the problem. The exception is thrown in the message receive delegate. I'm not 100% sure, but I'm confident that this will cause the API to shut down the flow.

    The reason is that the API knows the RX delegate is broken (because there has been an exception), so messages won't be processed properly. To prevent endless exceptions, the flow is terminated. I haven't tried this in .Net, but that's what the JAva API does.

  • Jordi
    Jordi Unconfirmed, Member Posts: 11
    Options

    Hi TomT,

    I am not sure what you mean.

    In the MessageEventHandler I do not handle any exception. There are just conditional statements. On the contrary, an exception is thrown, when the flow becomes null. The exception in the constructor is never thrown.

    My hypothesis is that the Solace client has problems when multiple consumers (session, flows, etc) for the same endpoint are created and disposed in a short time. Maybe the context or session caches something and then the flow assignment become ambiguous.

    Usually, this is not the case I assume, only here in this test instance.

    best,

    Jörg

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    @Jordi, what I mean is that I think your MessageEventHandler code is throwing an exception somewhere, and you're not catching it. That's what the line:

    Client Application threw an exception in flow rx message callback	
    

    in the log means.

  • Jordi
    Jordi Unconfirmed, Member Posts: 11
    Options

    When I put the whole message handler in a try-catch block, the only exception catched is when the flow becomes null and I try to acknowledge the message.

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    #10 Answer ✓
    Options

    Hi @Jordi, ah, I see! The exception is thrown because the flow is null.

    Back to @Aaron's comment, your .dispose() is rather brutal. Not only are you closing the flow, you are also disconnecting (and disposing of) the session. Unless you have very good performance reasons, use the same session for all your flows: opening a session for every flow like this is very expensive. What's more, you're then disposing of the context - again, this is uncessary.

    This is speculation, but what might be happening is some kind of race condition between consumer1 being closed, consumer 2 starting, then the underlying session or context being disposed.

    I recommend trying the 1 session, 1 context approach, and seeing if that fixes your problem.

  • Jordi
    Jordi Unconfirmed, Member Posts: 11
    Options

    Hi TomF,

    yes, some sort of race condition is our suspition too,

    Thanks for your hint not to open one session per flow. Usually, this is fine for us, as we do not open several sessions one after the other, except in this situation, when we run multiple test. I will change the code in tht respect.

    Best,

    Jordi

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee
    Options

    Hi @Jordi,

    Thanks for keeping us up to date. If that suggestion did fix your problem, please let us know!

  • Jordi
    Jordi Unconfirmed, Member Posts: 11
    Options

    Just a short follow up: indeed, when we just open one session and multipe flows. the problem of flows becoming null is gone. Thanks again for this fruitful discussion.