Why is the flow in the HandleMessageEvent sometimes null?
Hi,
we implemented a Solace consumer using the .NET API following the guidelines (https://tutorials.solace.dev/dotnet/persistence-with-queues/).
When runnung some tests we figured that sometimes messages were not properly acknowledged and removed from the queue.
When debugging the HandleMessageEvent we found that apparently, sometimes, when calling
Flow.Ack(message.ADMessageId)
Flow was null, especially, when we created several instances of the Consumer. This also happens when we properly Dispose the Consumer before creating a new:
public void Dispose() { Flow.Stop(); Flow.Dispose(); Session.Disconnect(); Session.Dispose(); Context.Dispose(); ContextFactory.Instance.Cleanup(); }
In the flow properties we only set
AckMode = MessageAckMode.ClientAck
Any idea, what might be the problem here?
Thanks, Jordi
Best Answer
-
Hi @Jordi, ah, I see! The exception is thrown because the flow is null.
Back to @Aaron's comment, your .dispose() is rather brutal. Not only are you closing the flow, you are also disconnecting (and disposing of) the session. Unless you have very good performance reasons, use the same session for all your flows: opening a session for every flow like this is very expensive. What's more, you're then disposing of the context - again, this is uncessary.
This is speculation, but what might be happening is some kind of race condition between consumer1 being closed, consumer 2 starting, then the underlying session or context being disposed.
I recommend trying the 1 session, 1 context approach, and seeing if that fixes your problem.
0
Answers
-
Hi @Jordi,
Are you seeing any FlowEvents? Specifically, what are you doing in the
HandleFlowEvent
code? In the sample, it prints "Received Flow Event...
" - are you seeing anything printed out like that? It could be that the flow has been disposed: you should see events, for instance that the flow is being closed.Do you check the flows are created properly? For instance, are you checking
createFlow
returns non-null?1 -
Hi TomF and Aron,
thanks for your comments. Yes, I check FlowEvents and flows seems to be properly created. I also create one flow per session. Here is what I do:
The consumer looks the following:
public SolaceConsumer(int id, string selectorName = null, string selectorValue = null) { _id = id;´// for debugging Console.WriteLine($"Starting consumer {_id}"); ContextFactoryProperties cfp = new ContextFactoryProperties() { SolClientLogLevel = SolLogLevel.Warning }; cfp.LogToConsoleError(); ContextFactory.Instance.Init(cfp); X509Store store = new X509Store(StoreName.Root, StoreLocation.LocalMachine); store.Open(OpenFlags.OpenExistingOnly | OpenFlags.ReadOnly); X509Certificate2Collection certs = store.Certificates; store.Close(); SessionProperties sessionProps = new SessionProperties() { Host = Resources.JNDI_PROVIDER_URL, VPNName = Resources.SOLACE_JMS_VPN, ReapplySubscriptions = true, UserName = null, ReconnectRetries = 5, SSLValidateCertificate = true, SSLValidateCertificateDate = true, AuthenticationScheme = AuthenticationSchemes.CLIENT_CERTIFICATE, SSLClientCertificateFile = Resources.CERTIFICATE_FILE, SSLClientPrivateKeyFile = Resources.CERTIFICATE_KEY, SSLTrustStore = certs }; _context = ContextFactory.Instance.CreateContext(new ContextProperties(), null); _session = _context.CreateSession(sessionProps, MessageEventHandler, null); IQueue queue = ContextFactory.Instance.CreateQueue(Resources.QUEUE); FlowProperties flowProperties = new FlowProperties() { AckMode = MessageAckMode.ClientAck }; if(!(selectorName is null || selectorValue is null)) flowProperties.Selector = $"{selectorName} = '{selectorValue}'"; ReturnCode returnCode = _session.Connect(); if (returnCode == ReturnCode.SOLCLIENT_OK) { try { _flow = _session.CreateFlow(flowProperties, queue, null, MessageEventHandler, HandleFlowEvent); } catch (Exception e) { Console.WriteLine(e.Message); } } } public void Dispose() { Console.WriteLine($"Disposing consumer {_id}"); _flow.Stop(); _flow.Dispose(); _session.Disconnect(); _session.Dispose(); _context.Dispose(); ContextFactory.Instance.Cleanup(); } private void HandleFlowEvent(object sender, FlowEventArgs args) { Console.WriteLine($"Consumer {_id}: {args.Event.ToString()} {args.Info}"); } private void MessageEventHandler(object source, MessageEventArgs args) { IMessage msg = args.Message; string content = Encoding.ASCII.GetString(msg.BinaryAttachment ?? msg.XmlContent); ReceivedMessages.Add(new KeyValuePair<string, IDestination>(content, msg.Destination)); if (true) { if (_flow is null) { Console.WriteLine($"Flow {_id} is null"); } ReturnCode returnCode = _flow.Ack(msg.ADMessageId); if (returnCode == ReturnCode.SOLCLIENT_OK) msg.Dispose(); } } }
Then I test, e.g., that the selector works:
// write two messages to the queue first, one with and one without selector // create a selective listener SolaceConsumer selectiveConsumer = new SolaceConsumer(1, selectorName, selectorValue); // wait to receive and acknowledge message await Task.Delay(TimeSpan.FromSeconds(2)); // the selective listener picks up only one message Assert.AreEqual(1, selectiveConsumer.ReceivedMessages.Count); // close selective listener selectiveConsumer.Dispose(); // write a message with selector writer.WriteMessage("Message 3", selectorName, selectorValue); // create a non-selective listener SolaceConsumer nonSelectiveConsumer = new SolaceConsumer(2); // wait to receive and acknowledge message await Task.Delay(TimeSpan.FromSeconds(2)); // the non-selective listener picks up all messages Assert.AreEqual(2, nonSelectiveConsumer.ReceivedMessages.Count); nonSelectiveConsumer.Dispose();
What I get as output on the console is, e.g.:
Starting consumer 1 Consumer 1: UpNotice OK Disposing consumer 1 Consumer 1: ParentSessionDown Session for Flow disconnected Starting consumer 2 Consumer 2: UpNotice OK Flow 2 is null Disposing consumer 2 Consumer 2: ParentSessionDown Session for Flow disconnected
For consumer 2 the flow becomes null and, therefore, the messages are not properly acknowledged. Sometimes the flow becomes null for the first consumer. Seems random.
I also get a warning:
Warning - FlowImpl: 09:10:18 | Client Application threw an exception in flow rx message callback | System.NullReferenceException: Object reference not set to an instance of an object. at SolaceConsumer.MessageEventHandler(Object source, MessageEventArgs args) in C:\...\SolaceConsumer.cs:line 139 at SolaceSystems.Solclient.Messaging.Native.FlowImpl.Flow_RxCallbackFuncV2(IntPtr opaqueFlow, IntPtr opaqueMessage, IntPtr user)
So, any idea, why the flow becomes null?
0 -
Hi @Jordi, I strongly suspect that your exception is the problem. The exception is thrown in the message receive delegate. I'm not 100% sure, but I'm confident that this will cause the API to shut down the flow.
The reason is that the API knows the RX delegate is broken (because there has been an exception), so messages won't be processed properly. To prevent endless exceptions, the flow is terminated. I haven't tried this in .Net, but that's what the JAva API does.
0 -
Hi TomT,
I am not sure what you mean.
In the MessageEventHandler I do not handle any exception. There are just conditional statements. On the contrary, an exception is thrown, when the flow becomes null. The exception in the constructor is never thrown.
My hypothesis is that the Solace client has problems when multiple consumers (session, flows, etc) for the same endpoint are created and disposed in a short time. Maybe the context or session caches something and then the flow assignment become ambiguous.
Usually, this is not the case I assume, only here in this test instance.
best,
Jörg
0 -
Hi @Jordi, ah, I see! The exception is thrown because the flow is null.
Back to @Aaron's comment, your .dispose() is rather brutal. Not only are you closing the flow, you are also disconnecting (and disposing of) the session. Unless you have very good performance reasons, use the same session for all your flows: opening a session for every flow like this is very expensive. What's more, you're then disposing of the context - again, this is uncessary.
This is speculation, but what might be happening is some kind of race condition between consumer1 being closed, consumer 2 starting, then the underlying session or context being disposed.
I recommend trying the 1 session, 1 context approach, and seeing if that fixes your problem.
0 -
Hi TomF,
yes, some sort of race condition is our suspition too,
Thanks for your hint not to open one session per flow. Usually, this is fine for us, as we do not open several sessions one after the other, except in this situation, when we run multiple test. I will change the code in tht respect.
Best,
Jordi
1