Spring Cloud Stream solace error handling

In the codelabs example it suggests we shouldn’t use global error handler as suggested by spring because binder will already be handling it.
Is there a recommendation how you’d configure error handler channel for an application which uses a system property to determine the destination queue in order for the application to scale? Ideally we want a catch all error handler rather than wrapping all composite functions in try/catch.
@ServiceActivator(inputChannel = “a/b/>.clientAck.errors”)
public void handleError(ErrorMessage message) {
logger.info("Binding Specific Error Handler executing business logic for: " + message.toString());
logger.info("Exception is here: " + message.getPayload());
}

Hi @msharpe ,
If I’m understanding correctly you have a JVM system property that you want to use inside of the @ServiceActivator > annotation, correct?
I haven’t tried it, but I’m assuming something like this doesn’t work?
@ServiceActivator(inputChannel = “#{systemProperties[‘your.system.property’]}”)
If not, can you see if this solution where you add a @Bean that returns PropertySourcesPlaceholderConfigurer works?
How to load a System property in Spring using annotations?
I have an issue with Spring, I have to load a system property using an annotation.

I’m trying this approach:

@Value(“${mySystemProperty}”)
private String mySystemPropertyValue;
But when I make t…
Hope that helps!

Thanks Marc. I’ve something like the following where I want the error handler to handle everything in one place
defintion: f1|f2|fn…
destination: QUEUE_NAME_${PARTITION_ID}
This approach doesn’t seem to work so I 'll > see if i can work out your other suggestion
@ServiceActivator(inputChannel = “#{systemProperties[‘your.system.property’]}”)

I guess another option could be to create a consumer function for the error queue
definition: f1|f2|fn…;errors
destination: QUEUE_NAME_${PARTITION_ID}
errors
destination: error/groupname/QUEUE_NAME_${PARTITION_ID}

Hi @msharpe ,
I want the error handler to handle everything in one place Ah okay. Sorry I haven’t had time to dig into this. Not sure if I’m missing another option, but I think three options are:
Implement error handling in one method/function and have a separate @ServiceActivator for each function that calls that Behind the scenes Spring Cloud Stream is taking advantage of Spring Integration channels which is what we are listening to via the @ServiceActivator . I think it should be possible to bridge all of those into a single channel using Spring Integration. Take a look at the @BridgeFrom or Gary’s response to this question on stackoverflow: https://stackoverflow.com/questions/41979612/spring-integration-simple-bridge-with-annotation Let them to go the Error queue and handle them from there (you’d just have to make sure they go to the same error queue)
Hope that helps!

Hi @marc.dipasquale
Not sure i completely follow this, when using service activator i don’t know the channel name and cant use a sys prop so not sure if it solves the issue. Perhaps you can can elaborate if im missing the point Similarly, i’m not sure if i can implement this without knowing the channel/destination name. I tried the example anyway but get unresolvable circular reference error creating service activator so far so haven’t proved it out. The problem with this approach is the message isn’t enriched with any exception information so not a solution for error handling I also looked at using @Around aspect annotation but we don’t have access to the input Message

@marc.dipasquale
correction i can get Message from joinpoint.getArgs so is probably best working option i have.
Example
handleError(ProceedingJoinpoint joinPoint, String functionName)
try{ joinPoint.proceed()
} catch(Exception e)
Message m = ((Message)joinpoint.getargs[0])
errorHandler.handle(m, e)

Hi @msharpe ,
Glad to hear that work around is working for you. Did you verify that when you do that the exception doesn’t still make it to the binder’s error handler and published to the error queue (or requeued to the broker) anyway?
This obviously isn’t the cleanest solution so I’ll look into requesting an enhancement to the framework itself. I’m thinking it would be nice to specify an errorHandler on the input binding. Something that looks like this:
spring.cloud.stream.bindings.myFunction-in-0.errorHandler=handleError
And then handleError is just the name of a bean in the code that takes in an ErrorMessage . I believe this should allow the developer to also re-use that handleError bean across multiple input bindings as well.
What do you think? Would that satisfy your needs and be much cleaner? Can you think of any downsides?

Hope you have a great weekend.

Hi @marc.dipasquale
That might be cleaner however, I think i’ve a way to create the service activator programatically using IntegrationFlows which i think solves the problem, having not had experience of the spring java DSL until now.
@Bean public IntegrationFlow errorFlow (@Value ("${channelName}, ErrorHandler errorHandler) String channel {
return IntegrationFlows.from(channelName).handle(ErrorMessage.class, (error, headers) → {
errorHandler.handle(error)
return null;
}).get();
}

Hi @msharpe > ,
That looks promising! Did you get it to work? I tried to copy your code to try it out but it wouldn’t compile for me. I tried fixing it but think I’m missing something small. If you got it working do you mind sharing? Either in the forum here or maybe just a link to a file on github?
FYI - if you use 3 tics in a row in the forum it will format your code to make it easier to read :slight_smile:
Example:
@Bean

public Consumer sink() {
return v → {
System.out.println(v);
};
}

Hi @marc.dipasquale

Please try the following, just replacing ‘function’. It appears to be working as expected from an initial test.
@Bean
public IntegrationFlow errorFlow(
@Value(“${spring.cloud.stream.bindings.function-in-0.destination}”) String destination,
@Value(“${spring.cloud.stream.bindings.function-in-0.group}”)String group)
{
String channel=destination+“.”+group+“.errors”;
return IntegrationFlows.from(channel).handle(ErrorMessage.class,(error, headers)->{
log.info(“error:”+error.getPayload().getMessage());
log.info(“message:”+new String((byte)error.getOriginalMessage().getPayload()));
return null;
}).get();
}

Hey @msharpe ,
I opened an enhancement request that has since been worked and closed on the Spring Cloud Stream project that should make error handling much cleaner since you can specify error handlers via configuration and also don’t need to use @ServiceActivator When you get a chance take a look and see if it meets your needs. It isn’t in a release yet so now is the time to make changes…assuming no issues are found it should be in the next framework release :slight_smile:
Enhancement to specify error handler via configuration · Issue #2374 · spring-cloud/spring-cloud-stream · GitHub

FYI @GreenRover I thought you might want to take a look at this enhancement as well (see link in previous post in this thread). Now is the time to make changes if you see any issues with it.

@marc.dipasquale the solution looks good to me.