Spring cloud stream solace binder StreamBridge.send() unexpected error

Hi Solace team, we are using Spring Cloud Stream Solace in our Spring Boot project, when we try to send messages simultaneously from 5 different threads by using `StreamBridge.send()` the underlying implementation "randomly" throws `org.springframework.messaging.MessagingException`, by randomly I mean it doesn't matter if I try to send 1000 messages at the same time or 50.

Library version:
`implementation("com.solace.spring.cloud:spring-cloud-starter-stream-solace:4.2.0")`

Full stack trace:

org.springframework.messaging.MessagingException: Cannot send message using handler 028dfc28-458f-46d9-a669-aa22169323fd
	at com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey.send(ErrorChannelSendingCorrelationKey.java:57)
	at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessagingException(JCSMPOutboundMessageHandler.java:188)
	at com.solace.spring.cloud.stream.binder.outbound.JCSMPOutboundMessageHandler.handleMessage(JCSMPOutboundMessageHandler.java:76)
	at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1185)
	at org.springframework.integration.handler.AbstractMessageHandler.doHandleMessage(AbstractMessageHandler.java:105)
	at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:73)
	at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:132)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
	at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
	at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
	at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:378)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:332)
	at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:302)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:187)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:146)
	at org.springframework.cloud.stream.function.StreamBridge.send(StreamBridge.java:141)
	at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish(EventMeshPublisher.kt:22)
	at com.ingka.fms.inbound.reversereceiving.common.service.EventMeshPublisher.publish$default(EventMeshPublisher.kt:18)
	at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.publish(ReverseReceivingUnitStatusUpdatedPublisher.kt:37)
	at com.ingka.fms.inbound.reversereceiving.receiving.messaging.publisher.reverseReceivingUnitStatus.ReverseReceivingUnitStatusUpdatedPublisher.createEventAndPublish(ReverseReceivingUnitStatusUpdatedPublisher.kt:73)
	at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.publishEvents(UnitService.kt:281)
	at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvents(UnitService.kt:391)
	at com.ingka.fms.inbound.reversereceiving.common.service.UnitService.updateUnitStatusAndPublishEvent(UnitService.kt:253)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:351)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:196)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
	at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
	at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:392)
	at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
	at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:184)
	at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:765)
	at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:717)
	at com.ingka.fms.inbound.reversereceiving.common.service.UnitService$$SpringCGLIB$$0.updateUnitStatusAndPublishEvent(<generated>)
	at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnit(ReceivingService.kt:563)
	at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceivingUnits(ReceivingService.kt:554)
	at com.ingka.fms.inbound.reversereceiving.receiving.service.ReceivingService.completeReceiving$lambda$8(ReceivingService.kt:298)
	at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException: Message handler 028dfc28-458f-46d9-a669-aa22169323fd is not running
	... 41 common frames omitted

Best Answer

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 971 admin
    edited July 2024 #2 Answer ✓

    Thanks @siavashsoleymani,

    Out of curiosity can you try adjusting the spring.cloud.stream.dynamic-destination-cache-size parameter (default 10) and see if that helps? If you know you are sending to like 100 different topics over your apps lifetime try to change it to like 110 and see if that helps.

    **If this is the issue you might consider trying out the second option for publishing to dynamic destinations as it should also be more performant with Solace :) . I covered it in section 6 here: https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/?index=..%2F..index#5

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 971 admin
    edited July 2024 #3

    Hi @siavashsoleymani,

    Good to see you again! It's been a while :) And thank you for sharing. Any chance you have code you can share that easily reproduces it? Also, can I get you to open a github issue on this? It sounds like it's going to be a bug. If you have official support I'd also suggest reporting it via that avenue to help get a fix prioritized.

    https://github.com/SolaceProducts/solace-spring-cloud

    Also as a heads up I deleted the duplicate discussion that was started and left the one marked as a "Question".

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭

    Hi @marc nice to see you too!
    I am trying to write a simple code that reproduces it, unfortunately, I have not been able to replicate it on my local yet, I am going to create an issue on GitHub and if I make it on my local I will add that code

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭
    edited July 2024 #5

    maybe it gives more clue I do see this in the logs as well:

    Stopping producer to TOPIC xxx/xxx/status/updated/V2/FA/FMSRR/xx/xx/ddd/aaaa/rc <message handler ID: 088fd415-8c5c-4ea0-91af-c319403403d3>

    before the above log this log happens:

    logger:  c.s.s.c.s.b.util.SharedResourceManager


    message:  088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭

    @marc By a quick search in the Solace binder code, I see that this log comes from a stop() method, so I would assume that in a multithreaded environment, one thread would probably try to close the publisher while another thread is during its sending phase:

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭

    @marc

    I also see this log:
    logger:  c.s.s.c.s.b.util.SharedResourceManager

    message:  088fd415-8c5c-4ea0-91af-c319403403d3 is not the last user, persisting producer...



  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭

    Also, the difference I see between my small local project is that in this project this opening/closing producers doesn't happen, but in our production, it frequently happens

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 971 admin

    Thanks for the update @siavashsoleymani,

    I think you found the key here:

    Caused by: com.solace.spring.cloud.stream.binder.util.ClosedChannelBindingException: Message handler 028dfc28-458f-46d9-a669-aa22169323fd is not running
    

    does indeed mean that someone/something is administratively stopping the producer binding. Either through code or a person/machine through the bindings actuator REST endpoint.

    So we need to find out why it is being stopped in production. Are you deploying into K8s or something that is performing a health check and trying to restart the container if it fails?

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭

    @marc so after some more investigation I realized that the cardinality of the topics is important and on my local, I was trying to produce those messages on the same topic and adding some random suffix to the topic names, I was able to see in the logs that application open/close the handlers.

    2024-07-16T14:52:25.900+02:00 INFO 68298 --- [ool-1-thread-16] c.s.s.c.s.b.util.SharedResourceManager : b15b7319-a225-4d6f-bcb2-0111ce5ae655 is not the last user, persisting producer...
    2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Creating producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash3 <message handler ID: 3db42e77-8a15-4eff-a842-c39ed5163e20>
    2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] o.s.c.s.m.DirectWithAttributesChannel : Channel 'unknown.channel.name' has 1 subscriber(s).
    2024-07-16T14:52:25.901+02:00 INFO 68298 --- [ool-1-thread-17] .s.s.c.s.b.o.JCSMPOutboundMessageHandler : Stopping producer to TOPIC ingka.ilo.fms/inboundUnit/status/receiving-completed/V1/EU/FMSRR/BE/STO/siavash39 <message handler ID: c29b584f-2895-4d2c-9c14-0417c68b297c>

    By reviewing the code in `SharedResourceManager.java` and `JCSMPOutboundMessageHandler.java` in the Solace Library, a potential issue that stands out is related to the lifecycle management and synchronization of shared resources.

    The `release(String key)` method in `SharedResourceManager.java` closes the shared resource when the last key is deregistered. However, if another thread calls `get(String key)` concurrently just after the resource has been checked to be not empty but before it is closed, it might end up using a resource that is about to be closed. This race condition could lead to unexpected behavior or exceptions

    The `start()` and `stop()` methods manage the lifecycle. However, there's no explicit synchronization around the checks and operations on `isRunning` state and `producer` instance creation/release. In a multi-threaded scenario, this could lead to cases where multiple threads might concurrently attempt to start or stop the handler, potentially leading to inconsistent states or exceptions like `ClosedChannelBindingException` if one thread closes the producer while another is sending a message.

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭
  • marc
    marc Member, Administrator, Moderator, Employee Posts: 971 admin
    edited July 2024 #12 Answer ✓

    Thanks @siavashsoleymani,

    Out of curiosity can you try adjusting the spring.cloud.stream.dynamic-destination-cache-size parameter (default 10) and see if that helps? If you know you are sending to like 100 different topics over your apps lifetime try to change it to like 110 and see if that helps.

    **If this is the issue you might consider trying out the second option for publishing to dynamic destinations as it should also be more performant with Solace :) . I covered it in section 6 here: https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/?index=..%2F..index#5

  • siavashsoleymani
    siavashsoleymani Member Posts: 19 ✭✭
    edited July 2024 #13

    hey @marc good news! the problem is resolved now by increasing the cache size! but still, I think it is worth investing in fixing the bug specifically for bigger projects using Solace that every MB of RAM counts for them.
    All in all many thanks for your follow-ups and prompt responses helping us to resolve the issue.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 971 admin

    Hi @siavashsoleymani,

    Glad it worked for you! Thanks for confirming. And yes, we are still going to look at it internally. Tbh we are considering recommending NOT using streambridge when publishing to many dynamic topics b/c of this issue. It is more efficient, and not much harder just to use the second option I referenced above.

    So basically doing

    streamBridge.send("some-fixed-output-binding", MessageBuilder.fromMessage(msg).setHeader(BinderHeaders.TARGET_DESTINATION, myDestination).build());
    

    instead of:

    streamBridge.send(myDestination, msg);