Specify Spring Cloud Stream Error Handlers though application configuration

giri
giri Member, Administrator, Employee Posts: 116 admin
edited August 2023 in General Discussions #1

In the current scheme of things, you would be wiring the error handlers through @ServiceActivator annotation. The annotation parameters helped distinguish global versus binding-specific error handlers. If you notice, the association is captured in the code, and a change would require code-level updates.

In the configuration,

spring:
  cloud:
    function:
      definition: myFunction
    stream:
      bindings:
        myFunction-in-0:
          destination: 'a/b/>'
          group: clientAck

and in the code

@ServiceActivator(inputChannel = "a/b/>.clientAck.errors") 
public void handleError(ErrorMessage message) {
    logger.info("Binding Specific Error Handler executing business logic for: " + message.toString());
    logger.info("Exception is here: " + message.getPayload());
}

It would be nice to specify error handlers at a binding level in the configuration properties that let the framework handle wiring the channels together without you having to parameterize the channel name in the annotation.

@marc and Oleg Zhurakousky of Spring Source had an extended discussion and a proposal for an easier way to bind the error handlers through application configuration. Check out the GitHub discussion for context and details.

The switch to configuration-driven specification came in with Spring Cloud Stream 4 and backported to 3.x (available in 3.5.2) Spring Cloud Stream.

With this, you need to update the dependency with the 3.5.2 version for Spring Cloud Stream in the pom and convert error handlers to plain bean functions (with @Bean annotation)

Here are the steps to affect this in your code:

1) Update the spring cloud stream dependency in the pom

<dependency>
  <groupId>org.springframework.cloud</groupId>
  <artifactId>spring-cloud-stream</artifactId>
  <version>3.2.5</version>
</dependency>

2) Update the application configuration with global and binding-specific error handler function names.

spring:
  cloud:
    function:
      definition: myFunction
      default:
        error-handler-definition: handleGlobalError
    stream:
      bindings:
        myFunction-in-0:
          destination: 'a/b/>'
          group: clientAck
          error-handler-definition: handleBindingError

3) In the code, annotate the function with @Bean

@Bean
public Consumer<ErrorMessage> handleBindingError() {
    return message -> {
        logger.info("Binding Specific Error Handler received msg!" + message.toString());
        logger.info("Original Message: " + message.getOriginalMessage());
    }
}
@Bean
public Consumer<ErrorMessage> handleGlobalError() {
    return message -> {
        logger.info("Global errorChannel received msg!" + message.toString());
        logger.info("Original Message: " + message.getOriginalMessage());
    }
}

This change helps you manage error-handler specification as part of the binding properties in the configuration file, minimizing the code change impacts due to destination name change.

Comments

  • msharpe
    msharpe Member Posts: 17
    edited July 2023 #2

    Hi @giri

    I get a nullpointer using stream bridge moving from 3.2.1 to 3.2.5.

    NullPointerException: null

    ….CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:99)

    ….CloudEventsFunctionInvocationHelper.postProcessResult(CloudEventsFunctionInvocationHelper.java:48)

    …StreamBridge.send(StreamBridge.java:237)

    Is this the correct implementation as it will fail to autowire a bean of type ErrorMessage and other documentation is using a Consumer<ErrorMessage>. Also i thought a bean would have to return an object

  • giri
    giri Member, Administrator, Employee Posts: 116 admin

    Hi @msharpe - I am not sure about the null pointer issue after the migration, may I suggest opening an issue on Spring Cloud Stream project.

    On your observation on auto-wiring a been type ErrorMessage, you have a point. I will check and make corrections as necessary.

This Week's Leaders