Spring Cloud Stream solace error handling

msharpe
msharpe Member Posts: 17

In the codelabs example it suggests we shouldn't use global error handler as suggested by spring because binder will already be handling it.

Is there a recommendation how you'd configure error handler channel for an application which uses a system property to determine the destination queue in order for the application to scale? Ideally we want a catch all error handler rather than wrapping all composite functions in try/catch.

@ServiceActivator(inputChannel = "a/b/>.clientAck.errors") 
public void handleError(ErrorMessage message) {
    logger.info("Binding Specific Error Handler executing business logic for: " + message.toString());
    logger.info("Exception is here: " + message.getPayload());
}


Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin
    edited February 2022 #2

    Hi @msharpe,

    If I'm understanding correctly you have a JVM system property that you want to use inside of the @ServiceActivator annotation, correct?

    I haven't tried it, but I'm assuming something like this doesn't work?

    @ServiceActivator(inputChannel = "#{systemProperties['your.system.property']}") 
    

    If not, can you see if this solution where you add a @Bean that returns PropertySourcesPlaceholderConfigurer works?


    Hope that helps!

  • msharpe
    msharpe Member Posts: 17

    Thanks Marc. I've something like the following where I want the error handler to handle everything in one place

    defintion: f1|f2|fn....

    destination: QUEUE_NAME_${PARTITION_ID}

    This approach doesn't seem to work so I'll see if i can work out your other suggestion

    @ServiceActivator(inputChannel = "#{systemProperties['your.system.property']}") 
    


  • msharpe
    msharpe Member Posts: 17

    I guess another option could be to create a consumer function for the error queue

    definition: f1|f2|fn....;errors

    destination: QUEUE_NAME_${PARTITION_ID}

    errors

    destination: error/groupname/QUEUE_NAME_${PARTITION_ID}

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin

    Hi @msharpe,

    I want the error handler to handle everything in one place

    Ah okay. Sorry I haven't had time to dig into this. Not sure if I'm missing another option, but I think three options are:

    1. Implement error handling in one method/function and have a separate @ServiceActivator for each function that calls that
    2. Behind the scenes Spring Cloud Stream is taking advantage of Spring Integration channels which is what we are listening to via the @ServiceActivator. I think it should be possible to bridge all of those into a single channel using Spring Integration. Take a look at the@BridgeFrom or Gary's response to this question on stackoverflow: https://stackoverflow.com/questions/41979612/spring-integration-simple-bridge-with-annotation
    3. Let them to go the Error queue and handle them from there (you'd just have to make sure they go to the same error queue)


    Hope that helps!

  • msharpe
    msharpe Member Posts: 17

    Hi @marc

    1. Not sure i completely follow this, when using service activator i don't know the channel name and cant use a sys prop so not sure if it solves the issue. Perhaps you can can elaborate if im missing the point
    2. Similarly, i'm not sure if i can implement this without knowing the channel/destination name. I tried the example anyway but get unresolvable circular reference error creating service activator so far so haven't proved it out.
    3. The problem with this approach is the message isn't enriched with any exception information so not a solution for error handling

    I also looked at using @Around aspect annotation but we don't have access to the input Message

  • msharpe
    msharpe Member Posts: 17

    @marc

    correction i can get Message from joinpoint.getArgs so is probably best working option i have.

    Example

    handleError(ProceedingJoinpoint joinPoint, String functionName)

    try{ joinPoint.proceed()

    } catch(Exception e)

    Message m = ((Message)joinpoint.getargs[0])

    errorHandler.handle(m, e)

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin
    edited March 2022 #8

    Hi @msharpe,

    Glad to hear that work around is working for you. Did you verify that when you do that the exception doesn't still make it to the binder's error handler and published to the error queue (or requeued to the broker) anyway?

    This obviously isn't the cleanest solution so I'll look into requesting an enhancement to the framework itself. I'm thinking it would be nice to specify an errorHandler on the input binding. Something that looks like this:

    spring.cloud.stream.bindings.myFunction-in-0.errorHandler=handleError

    And then `handleError` is just the name of a bean in the code that takes in an ErrorMessage. I believe this should allow the developer to also re-use that handleError bean across multiple input bindings as well.

    What do you think? Would that satisfy your needs and be much cleaner? Can you think of any downsides?


    Hope you have a great weekend.

  • msharpe
    msharpe Member Posts: 17

    Hi @marc

    That might be cleaner however, I think i've a way to create the service activator programatically using IntegrationFlows which i think solves the problem, having not had experience of the spring java DSL until now.

    @Bean

    public IntegrationFlow errorFlow(@Value ("${channelName}, ErrorHandler errorHandler) String channel {

    return IntegrationFlows.from(channelName).handle(ErrorMessage.class, (error, headers) -> {

    errorHandler.handle(error)

    return null;

    }).get();

    }

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin

    Hi @msharpe,

    That looks promising! Did you get it to work? I tried to copy your code to try it out but it wouldn't compile for me. I tried fixing it but think I'm missing something small. If you got it working do you mind sharing? Either in the forum here or maybe just a link to a file on github?

    FYI - if you use 3 tics in a row in the forum it will format your code to make it easier to read :)

    Example:

    @Bean
    public Consumer<SensorReading> sink() {
       return v -> {
         System.out.println(v);
       };
    }
    


  • msharpe
    msharpe Member Posts: 17

    Hi @marc


    Please try the following, just replacing 'function'. It appears to be working as expected from an initial test.

    @Bean
    public IntegrationFlow errorFlow(
            @Value("${spring.cloud.stream.bindings.function-in-0.destination}") String destination,
            @Value("${spring.cloud.stream.bindings.function-in-0.group}")String group)
    {
        String channel=destination+"."+group+".errors";
        return IntegrationFlows.from(channel).handle(ErrorMessage.class,(error, headers)->{
            log.info("error:"+error.getPayload().getMessage());
            log.info("message:"+new String((byte[])error.getOriginalMessage().getPayload()));
            return null;
        }).get();
    }
    


  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin
    edited June 2022 #12

    Hey @msharpe,

    I opened an enhancement request that has since been worked and closed on the Spring Cloud Stream project that should make error handling much cleaner since you can specify error handlers via configuration and also don't need to use @ServiceActivator When you get a chance take a look and see if it meets your needs. It isn't in a release yet so now is the time to make changes...assuming no issues are found it should be in the next framework release :)

    https://github.com/spring-cloud/spring-cloud-stream/issues/2374

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 960 admin
    edited June 2022 #13

    FYI @GreenRover I thought you might want to take a look at this enhancement as well (see link in previous post in this thread). Now is the time to make changes if you see any issues with it.

  • msharpe
    msharpe Member Posts: 17

    @marc the solution looks good to me.