spring cloud stream solace pubsub
To handle publish error, using the @serviceActivator annotation.
@ServiceActivator(inputChannel ="topic.errors")
After springboot version 3.3.2 upgrade and spring cloud stream version 2023.0.2 upgrade, the publish errors are not handled by the serviceActivator.
spring.cloud.stream.solace.bindings.receiveMessage-out-0.producer.error-channel-enabled = true
Answers
-
I am able to replicate the publish error, but the handler is not handling the error message
{"instant":{"epochSecond":1724745793,"nanoOfSecond":375411000},"thread":"Context_2_ProducerDispatcher","level":"ERROR","loggerName":"org.springframework.integration.handler.LoggingHandler","message":"org.springframework.messaging.MessagingException: Producer received error during publishing (Spring message 6b31cc8d-373f-501d-5282-4c9e082cb33c) at 1724745793361, failedMessage=GenericMessage [payload=byte[15309], headers={contentType=application/json, id=6b31cc8d-373f-501d-5282-4c9e082cb33c, timestamp=1724745784290}]\n\tat com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey.send(ErrorChannelSendingCorrelationKey.java:59)\n\tat com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager$CloudStreamEventHandler.handleErrorEx(JCSMPSessionProducerManager.java:83)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerErrorNotification.handleNotification(ProducerErrorNotification.java:53)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerNotificationDispatcher.eventLoop(ProducerNotificationDispatcher.java:83)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerNotificationDispatcher.run(ProducerNotificationDispatcher.java:101)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: ((Client name: JQ439QQW19/64450/aac594298ef0262d0001/KbKKSeBpQB local(/10.210.248.200:64633) remote()) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Publish ACL Denied - Topic '«topicName»' [Subcode:28]\n\tat com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.getAndProcessResponse(JCSMPXMLMessageProducer.java:1530)\n\tat com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.handlePubMsgResponse(JCSMPXMLMessageProducer.java:2102)\n\tat com.solacesystems.jcsmp.impl.flow.PubFlowManager.handlePubMsgResponse(PubFlowManager.java:708)\n\tat com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleMessage(TcpClientChannel.java:1796)\n\tat com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.onMessage(TcpClientChannel.java:2773)\n\tat com.solacesystems.jcsmp.protocol.smf.SMFFrameHandler.decode(SMFFrameHandler.java:103)\n\tat com.solace.transport.impl.netty.NettyTransportInboundFrameDecoderAdapter.decode(NettyTransportInboundFrameDecoderAdapter.java:29)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473)\n\tat io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1336)\n\tat io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)\n\tat com.solace.transport.handler.SolSSLStatsHandler.channelRead(SolSSLStatsHandler.java:36)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t... 1 more\n","endOfBatch":true,"loggerFqcn":"org.apache.commons.logging.LogAdapter$Log4jLog","contextMap":{},"threadId":74,"threadPriority":5,"source":{"classLoaderName":"app","class":"org.springframework.core.log.LogAccessor","method":"error","file":"LogAccessor.java","line":250}}0 -
Hi @kirthi,
In Spring Cloud Stream v3.2.5 the way to define an error handler was made more clear. You can now add a
error-handler-definition
to each input binding that will receive the errors that occur.Learn more in section 10 here (search for "Spring Cloud Stream v3.2.5 or higher"):
.
I would also recommend taking a look at this great blog by @giri which includes some nice diagrams that help ensure developers can understand what's going on between the error handling options provided by the framework and the Solace binder:
Hope that helps and sorry for the confusion!
0 -
Hi @marc ,
Thanks for replying.
My question is related to explicitly handling the Publish error. The issues could be like ACL issues, spool over quota in the destination topic.
Previously, we have handled publish errors as below,
@ServiceActivator(inputChannel = "destination_topic.errors")public void handlePublishError(ErrorMessage message) {
After spring upgrade, this code is not working.
Checked in this document,
https://solace.com/blog/publication-and-receipt-spring-cloud-stream-binder-solace/
IMPORTANT:In the current version of the Spring Cloud Stream binder for Solace PubSub+, this annotation-based error handler specification approach is valid and fully supported. However, this will be deprecated in future releases and adopt a configuration property at the binding level to specify the error handler.
Could you please give an example what is meant by configuration property at the binding level.
I have tried,
spring.cloud.stream.bindings.receiveMessage-out-0.producer.error-handler-definition:myErrorHandler
This handler is not capturing the publish errors.
Now, the fix we have implemented is as below (after spring upgrade),
@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void handlePublishError(ErrorMessage message) {}But the consumer errors (thrown by the application) also listened by this interceptor.
is there any documentation to understand how to handle only publish errors?
0 -
Hi @kirthi ,
It seems like
error-handler-definition
doesn't work for producer bindings in Spring Cloud Stream. I've raised an issue to ask about this here:As an alternative solution, you can try using the Publisher Confirmations feature in the Solace binder to handle publish errors:
https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#publisher-confirmations
1