Spring Cloud Stream solace error handling
In the codelabs example it suggests we shouldn't use global error handler as suggested by spring because binder will already be handling it.
Is there a recommendation how you'd configure error handler channel for an application which uses a system property to determine the destination queue in order for the application to scale? Ideally we want a catch all error handler rather than wrapping all composite functions in try/catch.
@ServiceActivator(inputChannel = "a/b/>.clientAck.errors") public void handleError(ErrorMessage message) { logger.info("Binding Specific Error Handler executing business logic for: " + message.toString()); logger.info("Exception is here: " + message.getPayload()); }
Answers
-
Hi @msharpe,
If I'm understanding correctly you have a JVM system property that you want to use inside of the
@ServiceActivator
annotation, correct?I haven't tried it, but I'm assuming something like this doesn't work?
@ServiceActivator(inputChannel = "#{systemProperties['your.system.property']}")
If not, can you see if this solution where you add a
@Bean
that returnsPropertySourcesPlaceholderConfigurer
works?Hope that helps!
0 -
Thanks Marc. I've something like the following where I want the error handler to handle everything in one place
defintion: f1|f2|fn....
destination: QUEUE_NAME_${PARTITION_ID}
This approach doesn't seem to work so I'll see if i can work out your other suggestion
@ServiceActivator(inputChannel = "#{systemProperties['your.system.property']}")
0 -
Hi @msharpe,
I want the error handler to handle everything in one place
Ah okay. Sorry I haven't had time to dig into this. Not sure if I'm missing another option, but I think three options are:
- Implement error handling in one method/function and have a separate @ServiceActivator for each function that calls that
- Behind the scenes Spring Cloud Stream is taking advantage of Spring Integration channels which is what we are listening to via the
@ServiceActivator
. I think it should be possible to bridge all of those into a single channel using Spring Integration. Take a look at the@BridgeFrom
or Gary's response to this question on stackoverflow: https://stackoverflow.com/questions/41979612/spring-integration-simple-bridge-with-annotation - Let them to go the Error queue and handle them from there (you'd just have to make sure they go to the same error queue)
Hope that helps!
0 -
Hi @marc
- Not sure i completely follow this, when using service activator i don't know the channel name and cant use a sys prop so not sure if it solves the issue. Perhaps you can can elaborate if im missing the point
- Similarly, i'm not sure if i can implement this without knowing the channel/destination name. I tried the example anyway but get unresolvable circular reference error creating service activator so far so haven't proved it out.
- The problem with this approach is the message isn't enriched with any exception information so not a solution for error handling
I also looked at using @Around aspect annotation but we don't have access to the input Message
0 -
correction i can get Message from joinpoint.getArgs so is probably best working option i have.
Example
handleError(ProceedingJoinpoint joinPoint, String functionName)
try{ joinPoint.proceed()
} catch(Exception e)
Message m = ((Message)joinpoint.getargs[0])
errorHandler.handle(m, e)
0 -
Hi @msharpe,
Glad to hear that work around is working for you. Did you verify that when you do that the exception doesn't still make it to the binder's error handler and published to the error queue (or requeued to the broker) anyway?
This obviously isn't the cleanest solution so I'll look into requesting an enhancement to the framework itself. I'm thinking it would be nice to specify an
errorHandler
on the input binding. Something that looks like this:spring.cloud.stream.bindings.myFunction-in-0.errorHandler=handleError
And then `handleError` is just the name of a bean in the code that takes in an
ErrorMessage
. I believe this should allow the developer to also re-use thathandleError
bean across multiple input bindings as well.What do you think? Would that satisfy your needs and be much cleaner? Can you think of any downsides?
Hope you have a great weekend.
0 -
Hi @marc
That might be cleaner however, I think i've a way to create the service activator programatically using IntegrationFlows which i think solves the problem, having not had experience of the spring java DSL until now.
@Bean
public IntegrationFlow errorFlow(@Value ("${channelName}, ErrorHandler errorHandler) String channel {
return IntegrationFlows.from(channelName).handle(ErrorMessage.class, (error, headers) -> {
errorHandler.handle(error)
return null;
}).get();
}
0 -
Hi @msharpe,
That looks promising! Did you get it to work? I tried to copy your code to try it out but it wouldn't compile for me. I tried fixing it but think I'm missing something small. If you got it working do you mind sharing? Either in the forum here or maybe just a link to a file on github?
FYI - if you use 3 tics in a row in the forum it will format your code to make it easier to read :)
Example:
@Bean public Consumer<SensorReading> sink() { return v -> { System.out.println(v); }; }
0 -
Hi @marc
Please try the following, just replacing 'function'. It appears to be working as expected from an initial test.
@Bean public IntegrationFlow errorFlow( @Value("${spring.cloud.stream.bindings.function-in-0.destination}") String destination, @Value("${spring.cloud.stream.bindings.function-in-0.group}")String group) { String channel=destination+"."+group+".errors"; return IntegrationFlows.from(channel).handle(ErrorMessage.class,(error, headers)->{ log.info("error:"+error.getPayload().getMessage()); log.info("message:"+new String((byte[])error.getOriginalMessage().getPayload())); return null; }).get(); }
1 -
Hey @msharpe,
I opened an enhancement request that has since been worked and closed on the Spring Cloud Stream project that should make error handling much cleaner since you can specify error handlers via configuration and also don't need to use
@ServiceActivator
When you get a chance take a look and see if it meets your needs. It isn't in a release yet so now is the time to make changes...assuming no issues are found it should be in the next framework release :)https://github.com/spring-cloud/spring-cloud-stream/issues/2374
0 -
FYI @GreenRover I thought you might want to take a look at this enhancement as well (see link in previous post in this thread). Now is the time to make changes if you see any issues with it.
0