Spring publisher confirmation when channel is closed

Options
bruno
bruno Member Posts: 3
Hi colleagues,
We use spring-cloud-starter-stream-solace 3.5.0 and we have a question regarding this documentation https://github.com/SolaceProducts/solace-spring-
cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#publisher-confirmations.

We are running a test scenario in which an application connection is interrupted using a Chaos Monkey service and we publish a message to that application.
We expected the broker to return an error in the publisher code, but it didn't.

Publisher code:

public void send()
{
CorrelationData correlationData = new CorrelationData();
Message<GovernmentCommunicationRequest> message = buildMessage(correlationData);

streamBridge.send("govConnectorSendToGovSender-out-0", "solace", message);

try {
correlationData.getFuture().get(10000, TimeUnit.MILLISECONDS);
}
catch (InterruptedException | ExecutionException | TimeoutException e) {
throw new RuntimeException(e);
}
}

private <T> Message<GovernmentsCommunicationRequest> buildMessage(CorrelationData correlationData) {
return MessageBuilder.withPayload(new GovernmentCommunicationRequest())
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, correlationData).build();
}

Could you help us identify what configuration we are missing to the "publisher flow" throw an error?

Best Answer

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    edited November 2023 #2 Answer ✓
    Options

    Hi @espinosada,

    There are a bunch of things to consider here:

    1. If your max redelivery count is 5 and you have messages going to the DMQ due to max deliveries that means they are failing 5 times before doing so. Do you have an idea why those 11 messages failed 5 times? Consider if they would have succeeded if they were tried 5 or 10 or 30 seconds later or if they were just payloads that couldn't be parsed so they would have continued to fail. Note that by default Solace will redeliver immediately, but if your service is failing b/c of something that might be resolved with a little time you can configure delayed redelivery on the queue to make it wait prior to redelivery.

    I don't have tested it but it seems that messages in DMQ are not DMQ eligible, what if there is a reconnection in DMQ as well? The messages would be lost?

    2. You are correct that messages placed on DMQs are not DMQ eligible. That said, if you want to make sure you never lose a message you should make sure you have redelivery set to "try forever" (the default) on your DMQ to ensure messages are never discarded due to the number of redeliveries.

    Before migrating to solace we were using Rabbit MQ with TTL to expire from DLQ to main queue, but it is not clear if the same is possible with solace, it seems like its not.

    3. Solace out of the box doesn't allow you to chain DMQs together, however since you are using Spring Cloud Stream you could actually implement something like this by using the "error queue" that our binder allows you to configure using autoBindErrorQueue. That error queue is just a regular Solace queue so you could configure it so it's DMQ is your initial delivery queue. You have just have to be careful when doing so as you could end up with messages going in an infinite loop if you don't have some other sort of error checking. You may just want to have an app that listens to your DMQ, does some sort of check to make sure then message is processable and then have it published back to the main queue.

    Hope that helps!

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    Options

    Hi @bruno,

    Can you share the application logs at the INFO or more granular level? By default when an app loses connection it won't just fail, it will try to reconnect based on your configuration.

  • bruno
    bruno Member Posts: 3
    edited November 2023 #4
    Options
    Hi @marc! Thanks for your reply. I've run other tests and it seems that there's a problem with consuming the message.

    1. The message queuing service connection was interrupted in the "consumer" app, using the Chaos Monkey service.
    2. A message was published to the "consumer" app.
    3. The API GET request showed messageCount = 1 while the connection is interrupted.
    4. The message queuing service is reestablished and the API GET request shows messageCount = 0.
    5. The message was not consumed by the "consumer" app.

    We expected the "consumer" app to consume the message correctly after the connection was reestablished.

    Some logs:

    - Client-1: handleException(): Channel Closed Event (smfclient 1) with exception: Error communicating with the router. (KeepAlive) cur_stack=Thread: "Context_2_ReactorThread" daemon id=306, prio=5: com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleException(TcpClientChannel.java:1390) com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleKeepaliveException(TcpClientChannel.java:337) com.solacesystems.jcsmp.protocol.impl.KeepaliveTimeoutHandler.handleTimeout(KeepaliveTimeoutHandler.java:57) com.solacesystems.jcsmp.impl.timers.impl.JCSMPTimerQueueImpl.runAllTo(JCSMPTimerQueueImpl.java:91) com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor.eventLoop(SyncEventDispatcherReactor.java:168) com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor$SEDReactorThread.run(SyncEventDispatcherReactor.java:338) java.lang.Thread.run(Thread.java:750)

    - Client-1: (Client name: ___ Local addr: __ Local port: __ Remote addr: ___ Remote port: ___) - startReconnect(): Channel Closed Event (smfclient 1); exception: Error communicating with the router. (KeepAlive) cur_stack=Thread: "Context_2_ReactorThread" daemon id=306, prio=5: com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.startReconnect(TcpClientChannel.java:1351) com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleException(TcpClientChannel.java:1415) com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleKeepaliveException(TcpClientChannel.java:337) com.solacesystems.jcsmp.protocol.impl.KeepaliveTimeoutHandler.handleTimeout(KeepaliveTimeoutHandler.java:57) com.solacesystems.jcsmp.impl.timers.impl.JCSMPTimerQueueImpl.runAllTo(JCSMPTimerQueueImpl.java:91) com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor.eventLoop(SyncEventDispatcherReactor.java:168) com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor$SEDReactorThread.run(SyncEventDispatcherReactor.java:338) java.lang.Thread.run(Thread.java:750)

    - Client-1: (Client name: ___ Local addr: __ Local port: __ Remote addr: ___ Remote port: ___) - handleException(): Channel Closed Event (smfclient 1)

    .... some connection attempts ..

    - Client-1: Connected to host 'orig=tcps://___, scheme=tcps://, host=___, port=___' (smfclient 1)

    - Executing timed publisher retransmission.
  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    Options

    Hi @bruno,

    In your last response you mention "The message was not consumed by the "consumer" app", but I think you're just sharing the publisher part here?

    Note that if you have official Solace support it might be easier for you to share your actual code with them for more specific troubleshooting.

  • bruno
    bruno Member Posts: 3
    Options

    Hi @marc,

    Thanks for your reply. I'd like to reiterate that the error seems to occur when the Solace connection is reestablished in the application.

    1. The message queuing service connection was interrupted in the "consumer" app, using the Chaos Monkey service.
    2. A message was successfully published to the queue.
    3. The queue API GET request showed messageCount = 1 while the connection is out.
    4. The message queuing service is reestablished and the queue API GET request shows messageCount = 0.
    5. However, the message was not consumed by the "consumer" app.

    It seems that all messages in the queue are discarded when the connection is reestablished.

    Is this possible?

    Properties:
    spring.cloud.stream.bindings.govConnSolaceInput.binder=solace
    spring.cloud.stream.bindings.govConnSolaceInput.destination=gov-connector.v1.send-to-gov
    spring.cloud.stream.bindings.govConnSolaceInput.group=gov-connector
    spring.cloud.stream.bindings.govConnSolaceInput.consumer.concurrency=50
    spring.cloud.stream.solace.bindings.govConnSolaceInput.consumer.queueNameExpression=destination.trim()
    spring.cloud.stream.solace.bindings.govConnSolaceInput.consumer.errorQueueNameExpression=destination.trim() + '.error-queue'
    spring.cloud.stream.solace.bindings.govConnSolaceInput.consumer.autoBindErrorQueue=false
    spring.cloud.stream.solace.bindings.govConnSolaceInput.consumer.provisionDurableQueue=false
    spring.cloud.stream.solace.bindings.govConnSolaceInput.consumer.addDestinationAsSubscriptionToQueue=false


    Consumer code:

    ===================================================================================
    @EnableBinding(MessageInputProcessor.class)
    public class GovConnInputChannel {
    private static final Logger LOGGER = LoggerFactory.getLogger(GovConnInputChannel.class);

    @StreamListener(ChannelConstants.GOV_CONNECTOR_SOLACE_INPUT)
    public void consumeSolace(@Payload GovernmentCommunicationRequest request) {
    LOGGER.info("Received message via Solace: {}.", request);
    }
    }

    ===================================================================================

    Thanks for your suggestion, we have official Solace support and we are accessing them as well. 😉

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    Options

    Hi @bruno,

    The use of the annotations @EnableBinding and @StreamListenerhave been deprecated for quite some time in Spring Cloud Stream. If possible, I would try to move away from those and using Spring Cloud Functions as defined in the Spring Cloud Stream reference guide instead.

    There are only a few reasons that a message should be removed from a queue. The most common is b/c it was acknowledged by a consumer, but if you check PS+ Mgr for "stats" on a queue you will also find stats broken down per reason. I would suggest checking here and seeing what it tells you.

  • espinosada
    espinosada Member Posts: 1
    Options
    Hi, i'm a Bruno colleague and i will be updating this thread for now.

    The behavior remains with spring cloud functions.
    Maybe the messages were directed into DMQ but since there is no DMQ configured they were discarded.
    I don't have tested it but it seems that messages in DMQ are not DMQ eligible, what if there is a reconnection in DMQ as well? The messages would be lost?
    Before migrating to solace we were using Rabbit MQ with TTL to expire from DLQ to main queue, but it is not clear if the same is possible with solace, it seems like its not.

    I'm using this topic to find the best way to not lose any message using Solace, thanks for your help.

    - Messages Redelivered - 55
    - Messages Queued (msgs) - 319
    - Attempted move to DMQ due to Maximum Redeliveries, Failed - 11
    - Maximum Redelivery Count - 5
  • marc
    marc Member, Administrator, Moderator, Employee Posts: 921 admin
    edited November 2023 #9 Answer ✓
    Options

    Hi @espinosada,

    There are a bunch of things to consider here:

    1. If your max redelivery count is 5 and you have messages going to the DMQ due to max deliveries that means they are failing 5 times before doing so. Do you have an idea why those 11 messages failed 5 times? Consider if they would have succeeded if they were tried 5 or 10 or 30 seconds later or if they were just payloads that couldn't be parsed so they would have continued to fail. Note that by default Solace will redeliver immediately, but if your service is failing b/c of something that might be resolved with a little time you can configure delayed redelivery on the queue to make it wait prior to redelivery.

    I don't have tested it but it seems that messages in DMQ are not DMQ eligible, what if there is a reconnection in DMQ as well? The messages would be lost?

    2. You are correct that messages placed on DMQs are not DMQ eligible. That said, if you want to make sure you never lose a message you should make sure you have redelivery set to "try forever" (the default) on your DMQ to ensure messages are never discarded due to the number of redeliveries.

    Before migrating to solace we were using Rabbit MQ with TTL to expire from DLQ to main queue, but it is not clear if the same is possible with solace, it seems like its not.

    3. Solace out of the box doesn't allow you to chain DMQs together, however since you are using Spring Cloud Stream you could actually implement something like this by using the "error queue" that our binder allows you to configure using autoBindErrorQueue. That error queue is just a regular Solace queue so you could configure it so it's DMQ is your initial delivery queue. You have just have to be careful when doing so as you could end up with messages going in an infinite loop if you don't have some other sort of error checking. You may just want to have an app that listens to your DMQ, does some sort of check to make sure then message is processable and then have it published back to the main queue.

    Hope that helps!