How can we achieve exception handling in spring-cloud-stream using solace binder?

We tried following the exception handling as per spring-cloud-stream documentation (https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties). But it is not working as expected. I have attached the spring application yaml for your review to find if something needs to be corrected.
Do you guys have a sample project to refer ?

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Karthikaiselvan,
    Is your handler imperative or reactive? I tested the settings out with an imperative function as documented in the reference guide and they seem to work for me w/ the Solace Binder. Note that I also included setting springframework.retry: debug so I can see that the retry lib was indeed using my values.

    Note that we're currently adding some error handling enhancements in v3 of the SCSt binder and I plan on providing some best practices and examples of consumer error handling w/ SCSt and the Solace binder once that is available :)

    Below is what my yaml file looks like:

    spring:
      cloud:
        function:
          definition: processDropoffRideAverages
        stream:
          bindings:
            processDropoffRideAverages-out-0:
              destination: test/taxinyc/foo/ops/ride/updated/v1/stats/dropoff/avg
            processDropoffRideAverages-in-0:
              destination: test/taxinyc/RideDropoffProcessorQueue
              group: foo
              consumer:
                max-attempts: 4
                back-off-initial-interval: 2000
                back-off-max-interval: 30000
                back-off-multiplier: 3
                default-retryable: true
                retryable-exceptions:
                  java.lang.IllegalStateException: false
          solace:
            bindings:
              processDropoffRideAverages-in-0:
                consumer:
                  queueAdditionalSubscriptions: 'taxinyc/ops/ride/updated/v1/dropoff/>'
          binders:
            solace-binder:
              type: solace
              environment:
                solace:
                  java:
                    host: 'taxi.messaging.solace.cloud:55555'
                    msgVpn: nyc-modern-taxi
                    clientUsername: username
                    clientPassword: password
    logging:
      level:
        root: info
        org:
          springframework: info
          springframework.retry: debug
    

    Hope that helps!

  • Karthikaiselvan
    Karthikaiselvan Member Posts: 5

    @marc , Many thanks for your reply. But, we are using reactive functions.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Karthikaiselvan,
    Thanks for the clarification. Per the cloud stream docs, when using reactive functions the framework relies on retryBackoff capabilities for retry which seems to only work with maxAttempts, backOffInitialInterval and backOffMaxInterval. The other settings only apply to RetryTemplate used with imperative functions. I'll reach out to a dev at Spring and see if they have a good example we can point to.

    As a word of warning, you should only use Spring Cloud Stream w/ reactive functions when message loss can be tolerated. At the framework level it hands messages off to the Mono/Flux and ACKs the msg immediately so if your app were to crash the message would already be acknowledged back to the broker and removed from the queue. For this reason you'll see most of our examples/guidance using imperative functions.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited January 2021 #5

    Hi @Karthikaiselvan,
    I spoke to the cloud stream project lead at Spring and he confirmed my suspicion that the SCSt. Reference Guide is incorrect. Those retry settings do not work with reactive functions. He opened this issue to fix the reference guide. Basically Cloud Stream hands the messages to the Flux and you would need to handle the retry logic inside of the flux using reactor's retry capabilities. I believe this might be a good place to start while they update the docs.

    Hope that helps!

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Karthikaiselvan, I just wanted to let you know that the team over at Spring added the docs and closed the issue so hopefully this will be more clear going forward.

  • Karthikaiselvan
    Karthikaiselvan Member Posts: 5

    Thanks @marc. I will have a look at it.

  • Karthikaiselvan
    Karthikaiselvan Member Posts: 5

    @marc said:
    Hi @Karthikaiselvan,
    Thanks for the clarification. Per the cloud stream docs, when using reactive functions the framework relies on retryBackoff capabilities for retry which seems to only work with maxAttempts, backOffInitialInterval and backOffMaxInterval. The other settings only apply to RetryTemplate used with imperative functions. I'll reach out to a dev at Spring and see if they have a good example we can point to.

    As a word of warning, you should only use Spring Cloud Stream w/ reactive functions when message loss can be tolerated. At the framework level it hands messages off to the Mono/Flux and ACKs the msg immediately so if your app were to crash the message would already be acknowledged back to the broker and removed from the queue. For this reason you'll see most of our examples/guidance using imperative functions.

    if message loss can't be tolerated, is there a solution to handle it in reactive functions? or the only option is to use the imperative functions.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    @Karthikaiselvan said:

    @marc said:
    Hi @Karthikaiselvan,
    Thanks for the clarification. Per the cloud stream docs, when using reactive functions the framework relies on retryBackoff capabilities for retry which seems to only work with maxAttempts, backOffInitialInterval and backOffMaxInterval. The other settings only apply to RetryTemplate used with imperative functions. I'll reach out to a dev at Spring and see if they have a good example we can point to.

    As a word of warning, you should only use Spring Cloud Stream w/ reactive functions when message loss can be tolerated. At the framework level it hands messages off to the Mono/Flux and ACKs the msg immediately so if your app were to crash the message would already be acknowledged back to the broker and removed from the queue. For this reason you'll see most of our examples/guidance using imperative functions.

    if message loss can't be tolerated, is there a solution to handle it in reactive functions? or the only option is to use the imperative functions.

    Hi @Karthikaiselvan, the only option would be imperative functions. Note that this is out of the Solace binder's control and at the framework level.

  • Karthikaiselvan
    Karthikaiselvan Member Posts: 5

    Hi @marc ,

    Greetings!

    Any luck on the below ?

    Note that we're currently adding some error handling enhancements in v3 of the SCSt binder and I plan on providing some best practices and examples of consumer error handling w/ SCSt and the Solace binder once that is available :)

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Karthikaiselvan,

    Unfortunately I haven't had a chance to put together best practices yet.

    The super short version would be if you want to have the most control over error scenarios then do the following:

    • Use imperative, not reactive for reasons discussed earlier in this thread
    • Configure the Spring Retry template
    • Use the Spring Cloud Stream Consumer Group Pattern (creates durable queue in Solace)
    • Familiarize yourself with the Consumer Message Error Handling options.
    • Configure your queues ahead of time or use a Solace queue template. Pay attention to Max-TTL & configure a Dead Message Queue (DMQ)
    • Set autoBindErrorQueue to true
    • Use Client/Manual Acknowledgements
      • Accept messages that are properly processed!
      • Reject messages that your app has no clue what to do with and retrying wouldn't help. For example, there is vital missing information or the message wasn't parseable. This will route them to the error queue for further processing by another app or maybe manual intervention?
      • Requeue messages that you think would be processed successfully if retried. This would usually be due to infrastructure issues, for example maybe a downstream service wasn't responding or your instance of a microservice couldn't reach the database. Once max-retries or the TTL on a message are reached then it would fall to the DMQ for further troubleshooting.

    Hope that helps!