Solace Channel Interceptors

arda
arda Member Posts: 5

I am trying to implement common functionality in my methods marked with @StreamListener.

I tried registering a GlobalChannelInterceptor but found that the postReceive/afterReceive methods are not triggered when a message is received. Only the preSend/postSend/afterSendCompletion callbacks are executed when sending a message.

Any other ways to implement an interceptor for SubscribableChannels?

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    Hi @arda,
    Cool idea! I hadn't tried using the GlobalChannelInterceptor with the binder before. I believe this would be something within the framework vs. in the Solace Binder. I tried it out and found the same thing as you in that it doens't seem to work for postReceive or afterReceive methods. Can I get you to open an enhancement request on Spring's Cloud Stream repo and see what they say?

    That said, another option for implementing common functionality would be to use function composition which the cloud stream framework does support. Docs here, but I also just added a brand new sample here last week. If you look at the example maybe the preProcess and postProcess are common things that need to be done.

    Hope that helps!

  • GreenRover
    GreenRover Member Posts: 22 ✭✭

    Hi @arda,
    spring works as this point exactly not as expected ,-)
    I you have, like you, a ChannelInterceptor anotated with: @GlobalChannelInterceptor
    You might sink that, "preReceive" will called before a received message is processed. But this is only the case if: Like it would be for Tibco Rendevouz.

    For all "DirectChannel" like it would be used for solace, rabit, ... will be used.
    There your will be called at "preSend" even if only receive. This looks crazy at the beginning ab makes sense after reading the code for some hours.
    For example you can be a processe that receives and send messages like this:

    @Bean public Function<Message, Collection<Message<?>>> processTemperatureSensor(sensorReading) {
    return Arrays.asList(
    busineesCodeA(MessageBuilder.withPayload(sensorReading).build()),
    busineesCodeB(MessageBuilder.withPayload(sensorReading).build());
    );
    }
    Here you do send and received. At this point the same code will be called.
    Meaning the "preSend" will be called 3 times. 1 time for the received message and 2 times for the both send messages.
    Then you can do in preSend ugly thinks like:
    isInbound = ((DirectChannel)channel).getComponentName().endsWith("in-0");

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    Thanks for clarifying this @GreenRover. Definitely weird that it would act that way, but it makes sense b/c the interceptors are probably being triggered based on the underlying spring integration channels the cloud stream framework is using.

    To test it out I ran through your scenario locally and below is what I'm seeing with a simple function that looks like the one @GreenRover shared. So one message in, two out. And you can see the uppercase-in-0 (which is the input channel) is actually triggers the preSend, postSend and afterSendCompletion methods. Although ugly, this can at least be used with some conditional logic. I'll update this thread if improvements are added to the framework to better support this.

        @Bean
        public Function<String, Collection<Message<String>>> uppercase() {
            return v -> {
                System.out.println("Uppercasing: " + v);
                return Arrays.asList(businessCodeA(v),businessCodeB(v));
            };
        }
    

    Logs

    2021-01-25 15:58:05.428[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mo.s.i.h.s.MessagingMethodInvokerHelper  [0;39m [2m:[0;39m Overriding default instance of MessageHandlerMethodFactory with provided one.
    Uppercasing: Hello world!
    [2m2021-01-25 15:58:05.439[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In preSend for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.449[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In postSend for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.449[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In afterSendCompletion for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.449[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In preSend for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.456[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In postSend for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.456[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In afterSendCompletion for channel bean 'uppercase-out-0'
    [2m2021-01-25 15:58:05.456[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In postSend for channel bean 'uppercase-in-0'
    [2m2021-01-25 15:58:05.456[0;39m [32m INFO[0;39m [35m12434[0;39m [2m---[0;39m [2m[pool-2-thread-1][0;39m [36mc.s.s.spring.scs.ConvertFtoCProcessor   [0;39m [2m:[0;39m In afterSendCompletion for channel bean 'uppercase-in-0'