spring cloud stream solace pubsub

kirthi
kirthi Member Posts: 22 ✭✭

To handle publish error, using the @serviceActivator annotation.

@ServiceActivator(inputChannel ="topic.errors")  

After springboot version 3.3.2 upgrade and spring cloud stream version 2023.0.2 upgrade, the publish errors are not handled by the serviceActivator.

spring.cloud.stream.solace.bindings.receiveMessage-out-0.producer.error-channel-enabled = true

Answers

  • kirthi
    kirthi Member Posts: 22 ✭✭
    edited August 27 #2

    I am able to replicate the publish error, but the handler is not handling the error message
    {"instant":{"epochSecond":1724745793,"nanoOfSecond":375411000},"thread":"Context_2_ProducerDispatcher","level":"ERROR","loggerName":"org.springframework.integration.handler.LoggingHandler","message":"org.springframework.messaging.MessagingException: Producer received error during publishing (Spring message 6b31cc8d-373f-501d-5282-4c9e082cb33c) at 1724745793361, failedMessage=GenericMessage [payload=byte[15309], headers={contentType=application/json, id=6b31cc8d-373f-501d-5282-4c9e082cb33c, timestamp=1724745784290}]\n\tat com.solace.spring.cloud.stream.binder.util.ErrorChannelSendingCorrelationKey.send(ErrorChannelSendingCorrelationKey.java:59)\n\tat com.solace.spring.cloud.stream.binder.util.JCSMPSessionProducerManager$CloudStreamEventHandler.handleErrorEx(JCSMPSessionProducerManager.java:83)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerErrorNotification.handleNotification(ProducerErrorNotification.java:53)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerNotificationDispatcher.eventLoop(ProducerNotificationDispatcher.java:83)\n\tat com.solacesystems.jcsmp.protocol.nio.impl.ProducerNotificationDispatcher.run(ProducerNotificationDispatcher.java:101)\n\tat java.base/java.lang.Thread.run(Thread.java:833)\nCaused by: ((Client name: JQ439QQW19/64450/aac594298ef0262d0001/KbKKSeBpQB local(/10.210.248.200:64633) remote()) - ) com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Publish ACL Denied - Topic '«topicName»' [Subcode:28]\n\tat com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.getAndProcessResponse(JCSMPXMLMessageProducer.java:1530)\n\tat com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer.handlePubMsgResponse(JCSMPXMLMessageProducer.java:2102)\n\tat com.solacesystems.jcsmp.impl.flow.PubFlowManager.handlePubMsgResponse(PubFlowManager.java:708)\n\tat com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.handleMessage(TcpClientChannel.java:1796)\n\tat com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.onMessage(TcpClientChannel.java:2773)\n\tat com.solacesystems.jcsmp.protocol.smf.SMFFrameHandler.decode(SMFFrameHandler.java:103)\n\tat com.solace.transport.impl.netty.NettyTransportInboundFrameDecoderAdapter.decode(NettyTransportInboundFrameDecoderAdapter.java:29)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1473)\n\tat io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1336)\n\tat io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1385)\n\tat io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:530)\n\tat io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:469)\n\tat io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93)\n\tat com.solace.transport.handler.SolSSLStatsHandler.channelRead(SolSSLStatsHandler.java:36)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1407)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:918)\n\tat io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)\n\tat io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)\n\tat io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)\n\tat io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:994)\n\tat io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t... 1 more\n","endOfBatch":true,"loggerFqcn":"org.apache.commons.logging.LogAdapter$Log4jLog","contextMap":{},"threadId":74,"threadPriority":5,"source":{"classLoaderName":"app","class":"org.springframework.core.log.LogAccessor","method":"error","file":"LogAccessor.java","line":250}}

  • kirthi
    kirthi Member Posts: 22 ✭✭

    similar error reported :
    https://stackoverflow.com/questions/76991394/spring-cloud-stream-4-0-4-producer-error-channel-enabled-property-not-working

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 956 admin

    Hi @kirthi,

    In Spring Cloud Stream v3.2.5 the way to define an error handler was made more clear. You can now add a error-handler-definition to each input binding that will receive the errors that occur.

    Learn more in section 10 here (search for "Spring Cloud Stream v3.2.5 or higher"):

    https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/#9 .

    I would also recommend taking a look at this great blog by @giri which includes some nice diagrams that help ensure developers can understand what's going on between the error handling options provided by the framework and the Solace binder:

    https://solace.com/blog/error-handling-spring-cloud-stream-binder-solace-pubsub/

    Hope that helps and sorry for the confusion!

  • kirthi
    kirthi Member Posts: 22 ✭✭

    Hi @marc ,

    Thanks for replying.

    My question is related to explicitly handling the Publish error. The issues could be like ACL issues, spool over quota in the destination topic.

    Previously, we have handled publish errors as below,

    @ServiceActivator(inputChannel = "destination_topic.errors")

    public void handlePublishError(ErrorMessage message) {
    

    After spring upgrade, this code is not working.

    Checked in this document,

    https://solace.com/blog/publication-and-receipt-spring-cloud-stream-binder-solace/

    IMPORTANT:

     In the current version of the Spring Cloud Stream binder for Solace PubSub+, this annotation-based error handler specification approach is valid and fully supported. However, this will be deprecated in future releases and adopt a configuration property at the binding level to specify the error handler.

    Could you please give an example what is meant by configuration property at the binding level.

    I have tried,

    spring.cloud.stream.bindings.receiveMessage-out-0.producer.error-handler-definition:myErrorHandler
    

    This handler is not capturing the publish errors.

    Now, the fix we have implemented is as below (after spring upgrade),

    @ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
    public void handlePublishError(ErrorMessage message) {}

    But the consumer errors (thrown by the application) also listened by this interceptor.

    is there any documentation to understand how to handle only publish errors?

  • Jeff
    Jeff Member, Employee Posts: 3 Solace Employee
    edited August 29 #6

    Hi @kirthi ,

    It seems like error-handler-definition doesn't work for producer bindings in Spring Cloud Stream. I've raised an issue to ask about this here:

    https://github.com/spring-cloud/spring-cloud-stream/issues/2997

    As an alternative solution, you can try using the Publisher Confirmations feature in the Solace binder to handle publish errors:

    https://github.com/SolaceProducts/solace-spring-cloud/tree/master/solace-spring-cloud-starters/solace-spring-cloud-stream-starter#publisher-confirmations