Try PubSub+

How can we achieve exception handling in spring-cloud-stream using solace binder?

We tried following the exception handling as per spring-cloud-stream documentation (https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_consumer_properties). But it is not working as expected. I have attached the spring application yaml for your review to find if something needs to be corrected.
Do you guys have a sample project to refer ?

Answers

  • marcmarc Member, Administrator, Moderator, Employee Posts: 240 admin

    Hi @Karthikaiselvan,
    Is your handler imperative or reactive? I tested the settings out with an imperative function as documented in the reference guide and they seem to work for me w/ the Solace Binder. Note that I also included setting springframework.retry: debug so I can see that the retry lib was indeed using my values.

    Note that we're currently adding some error handling enhancements in v3 of the SCSt binder and I plan on providing some best practices and examples of consumer error handling w/ SCSt and the Solace binder once that is available :)

    Below is what my yaml file looks like:

    spring:
      cloud:
        function:
          definition: processDropoffRideAverages
        stream:
          bindings:
            processDropoffRideAverages-out-0:
              destination: test/taxinyc/foo/ops/ride/updated/v1/stats/dropoff/avg
            processDropoffRideAverages-in-0:
              destination: test/taxinyc/RideDropoffProcessorQueue
              group: foo
              consumer:
                max-attempts: 4
                back-off-initial-interval: 2000
                back-off-max-interval: 30000
                back-off-multiplier: 3
                default-retryable: true
                retryable-exceptions:
                  java.lang.IllegalStateException: false
          solace:
            bindings:
              processDropoffRideAverages-in-0:
                consumer:
                  queueAdditionalSubscriptions: 'taxinyc/ops/ride/updated/v1/dropoff/>'
          binders:
            solace-binder:
              type: solace
              environment:
                solace:
                  java:
                    host: 'taxi.messaging.solace.cloud:55555'
                    msgVpn: nyc-modern-taxi
                    clientUsername: username
                    clientPassword: password
    logging:
      level:
        root: info
        org:
          springframework: info
          springframework.retry: debug
    

    Hope that helps!

  • KarthikaiselvanKarthikaiselvan Member Posts: 2

    @marc , Many thanks for your reply. But, we are using reactive functions.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 240 admin

    Hi @Karthikaiselvan,
    Thanks for the clarification. Per the cloud stream docs, when using reactive functions the framework relies on retryBackoff capabilities for retry which seems to only work with maxAttempts, backOffInitialInterval and backOffMaxInterval. The other settings only apply to RetryTemplate used with imperative functions. I'll reach out to a dev at Spring and see if they have a good example we can point to.

    As a word of warning, you should only use Spring Cloud Stream w/ reactive functions when message loss can be tolerated. At the framework level it hands messages off to the Mono/Flux and ACKs the msg immediately so if your app were to crash the message would already be acknowledged back to the broker and removed from the queue. For this reason you'll see most of our examples/guidance using imperative functions.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 240 admin
    edited January 6

    Hi @Karthikaiselvan,
    I spoke to the cloud stream project lead at Spring and he confirmed my suspicion that the SCSt. Reference Guide is incorrect. Those retry settings do not work with reactive functions. He opened this issue to fix the reference guide. Basically Cloud Stream hands the messages to the Flux and you would need to handle the retry logic inside of the flux using reactor's retry capabilities. I believe this might be a good place to start while they update the docs.

    Hope that helps!

Sign In or Register to comment.