Spring Cloud Stream, solace messages not going to error queue

ejb3
ejb3 Member Posts: 6

Hi all,

Here's an overview of my issue:

I have two microservices, a producer and a consumer. Consumer is reading from MESSAGE.IN queue, getting an exception (as expected), then should place the message on error queue MESSAGE.IN.DLQ. Instead, consumer is consuming message, encountering exception, retrying 3 times, failing to post to error queue, and then failed message stalls in MESSAGE.IN queue. I expect the message to not be retried and only be sent to error queue.


Here is my configuration:

spring.cloud.stream.solace:
  default:
    producer:
      provisionDurableQueue: false
      queueNamePrefix: null
      useFamiliarityInQueueName: false
      useDestinationEncodingInQueueName: false
    consumer:
      provisionDurableQueue: false
      useFamiliarityInQueueName: false
      useDestinationEncodingInQueueName: false
      useGroupNameInQueueName: false
      queueNamePrefix: null
      provisionErrorQueue: false
      autoBindErrorQueue: true
      useGroupNameInErrorQueueName: false
spring.cloud:
  function.definition: adapter
  stream:
    bindings:
      adapter-in-0:
        destination: MESSAGE.IN
        group: ADAPTER
        binder: solace
    solace:
      bindings:
        adapter-in-0:
          consumer:
            errorQueueNameOverride: MESSAGE.IN.DLQ


MESSAGE.IN and MESSAGE.IN.DLQ are preconfigured, preprovisioned queues. And looking at the Consumer's logs, it appears the queues are connecting successfully (can provide logs). I do see this "AD windowSize (255) is greater than router MaxDeliveredUnackedMessagesPerFlow (1), set AckThreshold to 0, folwId 57" error, though. Read about it on Solace docs, but no solution. Not sure how to configure with spring yml

These are the logs when the message is read from MESSAGE.IN queue:

o.s.i.h.s.MessagingMethodInvokerHelper   : Overriding default instance of MessageHandlerMethodFactory with provided one.
c.n.f.dutchfbi.adapter.AdapterProcessor  : Message received in consumer <-- successfully consumed
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 1
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 1
c.n.f.dutchfbi.adapter.AdapterProcessor  : Message received in adapter: FDS42864d58e2fd48f59216d1220ce9ea46
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 2
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 2
c.n.f.dutchfbi.adapter.AdapterProcessor  : Message received in adapter: FDS42864d58e2fd48f59216d1220ce9ea46
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 3
s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 3
o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' 
    ... My exception ...

c.s.s.c.s.b.u.SolaceErrorMessageHandler  : Processing message 9bbd4e62-5e7b-b6a0-07f5-f2dfac546f9f <failed-message: 8b0fb922-d9bb-cc21-1edc-ec8e671da35e, source-message: 1, >
backFactory$JCSMPAcknowledgementCallback : XMLMessage 1: Will be republished onto error queue MESSAGE.IN.DLQ
.s.b.u.ErrorQueueRepublishCorrelationKey : Republishing XMLMessage 1 to error queue MESSAGE.IN.DLQ - attempt 1 of 3
c.s.s.c.s.b.i.InboundXMLMessageListener  : Closing flow receiver to destination MESSAGE.IN
c.s.s.c.s.b.util.FlowReceiverContainer   : Unbinding flow receiver container 699eba3d-7771-4008-ac52-3bcb6fc1429e

I couldn't find a working way to configure maxAttempts to 1 -- also looks like error queue is hooked up correctly, but I have no idea why it cannot place the message. No other exception given.

Any help would be greatly appreciated! Let me know if anyone would like to see more context.

Thanks

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    Hi @ejb3,

    Can you check the permissions on the MESSAGE.IN.DLQ queue? First is your client-username the owner of the queue? If not, what are the non-owner permissions?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin
    edited February 2022 #3

    Also @ejb3 does your app take any action on your exception? It looks like the binder should be saying something like `Could not send XMLMessage xyz to error queue abc ` when it can't publish to the error queue so I'm confused why we aren't seeing that. Any chance something is telling the app to shutdown the Spring context?

  • ejb3
    ejb3 Member Posts: 6

    @marc Thanks for the reply,

    Yes, the client-username is the owner of the queue. Even when I go to Access Control -> Queues Owned, I see both MESSAGE.IN and MESSAGE.IN.DLQ. Regardless, the non-owner permissions are "Consume" for both.

    This is the function throwing the exception. Notice it is the adapter function queue is binding to.

    @Bean
    @Throws(ContentNotFoundException::class)
    fun adapter(): Function<Event, String> {
        return Function { event: Event ->
    
            throw ContentNotFoundException("Go to error queue");
    
        }
    }
    

    Nothing special about it, although I am wondering if Exception handling in Functions is different?

    Looking around I don't see anything very different than a normal spring app... we're in the beginning of this project so there's not much to it besides sending messages across Solace.

    These are my solace dependencies if that helps, is there any specific dependency that is needed for error handling?:

    implementation("org.springframework.boot:spring-boot-starter")
    implementation('org.springframework.boot:spring-boot-starter-integration:2.3.6.RELEASE')
    implementation('org.springframework.integration:spring-integration-feed:5.3.4.RELEASE')
    
    testImplementation('org.springframework.boot:spring-boot-starter-test')
    testImplementation 'org.springframework.integration:spring-integration-test:5.3.4.RELEASE'
    
    implementation ('com.solace.spring.cloud:spring-cloud-starter-stream-solace') {
        exclude group: 'org.springframework.boot', module: 'spring-boot-starter-cloud-connectors'
        exclude group: 'com.solace.cloud.cloudfoundry', module: 'solace-spring-cloud-connector'
        exclude group: 'com.solace.cloud.cloudfoundry', module: 'solace-java-cfenv'
        exclude group: 'io.pivotal.cfenv', module: 'java-cfenv-boot'
    }
    testImplementation 'org.springframework.cloud:spring-cloud-stream-test-support'
    // JMS Solace
    implementation 'com.solacesystems:sol-common:10.3.1'
    implementation 'com.solacesystems:sol-jcsmp:10.3.1'
    implementation 'com.solacesystems:sol-jms:10.3.1'
    implementation 'javax.jms:javax.jms-api:2.0.1'
    implementation 'com.sun.jndi:fscontext:1.2-beta-3'
    implementation 'com.sun.jndi:providerutil:1.2'
    implementation 'org.springframework:spring-jms:5.2.11.RELEASE'
    

    Also, for more context, I believe the queues are connecting correctly, I see these logs on app boot:

    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Creating durable queue MESSAGE.IN for consumer group ADAPTER
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Durable queue provisioning is disabled, MESSAGE.IN will not be provisioned nor will its configuration be validated
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Testing consumer flow connection to queue MESSAGE.IN (will not start it)
    c.s.jcsmp.impl.flow.BindRequestTask      : AD windowSize (255) is greater than router MaxDeliveredUnackedMessagesPerFlow (1), set AckThreshold to 0, folwId 56
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Connected test consumer flow to queue MESSAGE.IN, closing it
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Provisioning error queue MESSAGE.IN.DLQ
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Error Queue provisioning is disabled, MESSAGE.IN.DLQ will not be provisioned nor will its configuration be validated
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Testing consumer flow connection to queue MESSAGE.IN.DLQ (will not start it)
    c.s.jcsmp.impl.flow.BindRequestTask      : AD windowSize (255) is greater than router MaxDeliveredUnackedMessagesPerFlow (1), set AckThreshold to 0, folwId 57
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Connected test consumer flow to queue MESSAGE.IN.DLQ, closing it
    o.s.c.stream.binder.BinderErrorChannel   : Channel 'MESSAGE.IN.ADAPTER.errors' has 1 subscriber(s).
    o.s.c.stream.binder.BinderErrorChannel   : Channel 'MESSAGE.IN.ADAPTER.errors' has 2 subscriber(s).
    c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating 2 consumer flows for queue MESSAGE.IN <inbound adapter b76f5678-3b32-4924-b62b-a835f56d8e94>
    c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 1 of 2 for inbound adapter b76f5678-3b32-4924-b62b-a835f56d8e94
    c.s.s.c.s.b.util.FlowReceiverContainer   : Binding flow receiver container 699eba3d-7771-4008-ac52-3bcb6fc1429e
    c.s.jcsmp.impl.flow.BindRequestTask      : AD windowSize (255) is greater than router MaxDeliveredUnackedMessagesPerFlow (1), set AckThreshold to 0, folwId 58
    c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 2 of 2 for inbound adapter b76f5678-3b32-4924-b62b-a835f56d8e94
    c.s.s.c.s.b.util.FlowReceiverContainer   : Binding flow receiver container 8df4350c-8e2d-4846-b248-80aa56d27293
    c.s.jcsmp.impl.flow.BindRequestTask      : AD windowSize (255) is greater than router MaxDeliveredUnackedMessagesPerFlow (1), set AckThreshold to 0, folwId 59
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Subscribing queue MESSAGE.IN to topic MESSAGE.IN
    c.s.j.protocol.impl.TcpClientChannel     : Error Response (400) - Subscription Already Exists - Queue 'MESSAGE.IN' - Topic 'MESSAGE.IN'
    c.s.s.c.s.b.p.SolaceQueueProvisioner     : Queue MESSAGE.IN is already subscribed to topic MESSAGE.IN, SUBSCRIPTION_ALREADY_PRESENT error will be ignored...
    

    Is this AD windowSize message an issue at all?

  • ejb3
    ejb3 Member Posts: 6

    @marc Oh also, I turned on Debug logs, this is when the message fails:

    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 1
    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 1
    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 2
    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 2
    c.s.j.p.impl.KeepaliveTimeoutHandler     : Keepalive Timeout expired.
    c.s.j.p.impl.KeepaliveTimeoutHandler     : Sending keepalive message (1/10)
    c.s.jcsmp.protocol.impl.TcpChannel       : Scheduling KA timer in 3000ms
    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 3
    s.b.i.RetryableInboundXMLMessageListener : Failed to consume a message from destination MESSAGE.IN - attempt 3
    o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@350e5097]; nested exception is com.ntrs.fds.dutchfbi.adapter.cfk.exception.FundUtilityContentNotFoundException: Go to DLQ, failedMessage=GenericMessage [payload=byte[210], headers={skip-type-conversion=false, solace_scst_messageVersion=1, solace_expiration=0, deliveryAttempt=3, solace_destination=MESSAGE.IN, solace_timeToLive=0, solace_receiveTimestamp=0, nativeHeaders={b3=[b7d8f68d16c12496-9ccfc467a8622286-1]}, b3=b7d8f68d16c12496-9ccfc467a8622286-1, acknowledgmentCallback=com.solace.spring.cloud.stream.binder.util.JCSMPAcknowledgementCallbackFactory$JCSMPAcknowledgementCallback@ffc9192, solace_discardIndication=false, solace_dmqEligible=true, solace_priority=-1, solace_redelivered=true, id=5732741f-335b-1247-1ec9-75531a92ce21, contentType=application/json, timestamp=1644420116351}]
        ... My exception ...
    
    c.s.s.c.s.b.u.SolaceErrorMessageHandler  : Processing message c142dbb0-b9c2-ddf1-26c5-899c3397b6ad <failed-message: 5732741f-335b-1247-1ec9-75531a92ce21, source-message: 1, >
    backFactory$JCSMPAcknowledgementCallback : XMLMessage 1: Will be republished onto error queue MESSAGE.IN.DLQ
    .s.b.u.ErrorQueueRepublishCorrelationKey : Republishing XMLMessage 1 to error queue MESSAGE.IN.DLQ - attempt 1 of 3
    c.s.s.c.s.b.i.InboundXMLMessageListener  : Closing flow receiver to destination MESSAGE.IN
    c.s.s.c.s.b.util.FlowReceiverContainer   : Unbinding flow receiver container e24647c4-87b6-44cc-8dda-a85278b8ed3b
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: pauseFlowInternally=false
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: _startState=STOPPED
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: Clear AD timer
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: Clear AD timer
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: window size 0 due to startState=STOPPED
    c.s.jcsmp.impl.flow.FlowHandleImpl       : Flow 15: getWindowSize()=0
    c.s.jcsmp.impl.flow.FlowSmfUtil          : tpCreateAck: flow_d: 15, TpMsgId: 1, windowSz: 0
    c.s.jcsmp.impl.flow.FlowHandleImpl       : tpCreateAck: flow_d: 15, tt_lastInOrderTpMsg: 1, tt_windowSz: 0
    c.s.j.impl.queues.UnackedMessageList2    : UNACKLIST-ack>>> reason=flow-closing flow=15
    c.s.j.protocol.impl.TcpClientChannel     : sendUnbindRequest (smfclient 2) sendAdCtrlRequest Response 0 for flowId 15
    c.s.jcsmp.impl.RequestResponseTask       : RequestResponseTask ([UBRT resource=MESSAGE.IN flowId=15 counter=3]) startTimer
    c.s.jcsmp.impl.flow.UnbindRequestTask    : Executing response handler.
    c.s.jcsmp.impl.RequestResponseTask       : RequestResponseTask ([UBRT resource=MESSAGE.IN flowId=15 counter=3]) cancelTimer
    c.s.jcsmp.impl.flow.UnbindRequestTask    : Got OK unbindresponse, flowId=15
    c.s.j.p.impl.KeepaliveTimeoutHandler     : Keepalive Timeout expired.
    c.s.j.p.impl.KeepaliveTimeoutHandler     : Sending keepalive message (1/10)
    c.s.jcsmp.protocol.impl.TcpChannel       : Scheduling KA timer in 3000ms
    

    I don't see anything more worthwhile than in my orig post, but maybe you see something

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    Hi @ejb3,

    I haven't had time to dig into this, but at first glance I think there could be a few things:

    1. are you using Kotlin? that doesn't look like java code to me. I haven't actually tried out using our binder with Kotlin. In theory it works though..
    2. Your Spring dependencies versions are a bit all over the place. Any reason you're mixing a bunch of different Spring versions? Maybe you can simplify your dependencies to test this issue? Maybe try the ones in the pom in this folder.
    3. Your JCSMP version is quite old with v10.3. We are now on v10.13.0. I don't think this would cause an issue but it might be worth trying to use a newer version.

    Hope that helps! I have a pretty packed Thursday but if you don't make progress on it maybe I can have you shoot your project over to me and I can take a look Friday.

  • kimmieer
    kimmieer Member Posts: 6

    Thanks for this interesting solution!

  • ejb3
    ejb3 Member Posts: 6

    Yeah, really interesting solution @kimmieer

    It was an issue with the version numbers; after update, messages are successfully landing on dlq.

    Thanks @marc

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    Glad to hear it worked @ejb3 & @kimmieer :)