Solace Spring cloud stream and opentracing

Mike13
Mike13 Member Posts: 29

Hi there
I am trying to introduce tracing in my project, for that I defined the following dependency:



  io.opentracing.contrib
  opentracing-spring-jaeger-cloud-starter
  3.1.2


There are starters for kafka, rabbitmq...
But unfortunately none for Solace.
Now I want to implement it on my own.
What would be the best hook for a ReceiveTracingInterceptor and a SendTracingAspect?

My spring-cloud-starter-stream-solace version is 2.1.1

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    Hi @Mike13,
    This sounds awesome! I love the idea of using open tracing as I think something like that is definitely necessary to really understand where your events flow at runtime. That said, I haven't gotten a chance to dig into it myself. Can you share or point me to more info on what's required for a "hook" to be used? I think that will give me enough info to be dangerous!

    Thanks!

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    Here some information what happens when I start it without doing anything but adding the dependency to jaeger:
    The io.opentracing.contrib.spring.integration.messaging.OpenTracingChannelInterceptor#OpenTracingChannelInterceptor from https://github.com/opentracing-contrib/java-spring-messaging/tree/master/opentracing-spring-messaging enriches the Message with the following headers:

    these leads to problems in com.solace.spring.cloud.stream.binder.util.XMLMessageMapper#addSDTMapObject
    Because the ThreadLocalScope is not an instance of Serializable..

    Here you can find how they do it for rabbitmq:
    https://github.com/opentracing-contrib/java-spring-rabbitmq/blob/master/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java
    https://github.com/opentracing-contrib/java-spring-rabbitmq/blob/master/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqReceiveTracingInterceptor.java
    or for jms:
    https://github.com/opentracing-contrib/java-jms/tree/master/opentracing-jms-spring/src/main/java/io/opentracing/contrib/jms/spring

    Best regards
    Mike

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    Hi @Mike13,
    It's been a crazy week so just circling back to this. So you're indeed correct that right now the Solace Binder only supports headers which either implement Serializable or are directly compatible with JCSMP's SDTMap. One of the reasons for this is that it is required by the Spring serializer that we use for our message headers (whose ObjectOutputStream also enforces an implementation of Serializable). We're looking at potentially adding an option (similar to what the rabbit binder does) to get around that in a future version of the binder but it would likely come at the cost of not being able to map the object back to it's original form on the consumer side like the rabbit binder.

    That said, do you think there is some way to wrap the non-serializable with a Serializable?

    As far as where to hook in I would suggest:

    Hope that helps! I'm definitely interested in following where this goes so I hope you'll be able to create the solace starter as an open source project. If so please respond with the repo below so I can watch/contribute!

    thanks!

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin

    More info that might be useful, under the covers our Spring Cloud Stream Binder uses JCSMP (our Java API).
    There are a few JCSMP send methods which you can see documented here although it's just that one I linked to in the last post that the cloud stream binder uses. And on the receiving side you can take a synchronous polling approach (like our binder does) or register a listener that would receive an asynchronous callback in the onReceive method documented here

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    Thank you very much for the reply! I had kind of a crazy week as well. I didn't manage to go further with this "project".
    But I hope I can dive futher in the topic this week :-).

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    I have finally found some time and could continue working on this project. I didn't manage to intercept InboundXMLMessageListener.handleMessage because the JCSMPInboundChannelAdapter.buildListener method is private. But I did this:

    @Configuration
    @EnableConfigurationProperties({SolaceExtendedBindingProperties.class})
    @ComponentScan(excludeFilters = {
            @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = {SolaceMessageChannelBinderConfiguration.class})})
    public class SolaceMessageChannelBinderTracingConfiguration {
    
        private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageChannelBinderTracingConfiguration.class);
    
        private final SpringJCSMPFactory springJCSMPFactory;
        private final SolaceExtendedBindingProperties solaceExtendedBindingProperties;
    
        private JCSMPSession jcsmpSession;
    
        public SolaceMessageChannelBinderTracingConfiguration(
                final SpringJCSMPFactory springJCSMPFactory,
                final SolaceExtendedBindingProperties solaceExtendedBindingProperties) {
            this.springJCSMPFactory = springJCSMPFactory;
            this.solaceExtendedBindingProperties = solaceExtendedBindingProperties;
        }
    
        @PostConstruct
        private void initSession() throws JCSMPException {
            jcsmpSession = springJCSMPFactory.createSession();
            LOG.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName()));
            jcsmpSession.connect();
        }
    
        @Bean
        SolaceMessageChannelBinder solaceMessageChannelBinder(
                final SolaceOutboundMessageTracingAspect outputMessageTracingAspect,
                final SolaceInboundMessageTracingAspect inboundMessageTracingAspect) {
            final SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, provisioningProvider()) {
                @Override
                protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties,
                        MessageChannel errorChannel) {
                    final MessageHandler messageHandler = super.createProducerMessageHandler(destination, producerProperties, errorChannel);
                    return ProxyUtil.createProxy(messageHandler, outputMessageTracingAspect);
                }
    
                @Override
                protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties properties) {
                    final MessageProducer messageProducer = super.createConsumerEndpoint(destination, group, properties);
                    return ProxyUtil.createProxy(messageProducer, inboundMessageTracingAspect);
                }
            };
            binder.setExtendedBindingProperties(solaceExtendedBindingProperties);
            return binder;
        }
    
        @Bean
        SolaceQueueProvisioner provisioningProvider() {
            return new SolaceQueueProvisioner(jcsmpSession);
        }
    }
    

    With the outboundMessageTracingAspect I intercept_ org.springframework.messaging.MessageHandler.handleMessage_ to decorate the message with tracing information.
    With the inboundMessageTracingAspect I intercept org.springframework.integration.core.MessageProducer.setOutputChannel to adapt the MessageChannel. The adapter extracts the tracing information out of the message before delegating to the real messageChannel.
    But now I have to dive into the opentracing topic to get the tracing implementation right.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin
    edited August 2020 #8

    Awesome, thanks for sharing @Mike13. It looks like you're on the right track 🥳! Is the code on github so I can take a closer look at some point?

    Also if you don't mind me asking how did you get around the non-serializable header issue?

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    I just put the code on github.
    The project https://github.com/MikeR13/esta-spring-cloud-stream-solace contains an example app for spring cloud stream with solace and I also put the code for the opentracing "project" there (https://github.com/MikeR13/esta-spring-cloud-stream-solace/tree/master/src/main/java/io/opentracing/contrib downwards). But please do not look too closely, it is a draft ;-)
    Regarding the "non-serializable header issue":
    There is still an open question on stackoverflow that I have asked https://stackoverflow.com/questions/63491625/opentracing-spring-cloud-stream-and-solace .

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 972 admin
    edited August 2020 #10

    Great, thanks @Mike13! I'll read it as a Draft :)

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    I was on vacation, so I didn't do much more on the subject. The only thing I found out is that this approach does not work if I define another binder in addition to the Solace binder. I hope to be able to continue here soon....

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    This approach does not work when there are several Binders in a project (with this approach only the default config is applied):

    @Configuration__
    @ConditionalOnClass({XMLMessage.class, SolaceMessageChannelBinder.class})
    @ConditionalOnBean(Tracer.class)
    @AutoConfigureAfter(TracerAutoConfiguration.class)
    @ConditionalOnProperty(name = "opentracing.spring.cloud.solace.enabled", havingValue = "true", matchIfMissing = true)
    @Import(SolaceMessageChannelBinderTracingConfiguration.class)
    public class SolaceAutoConfiguration {
    
    }
    
    @Configuration
    @EnableConfigurationProperties({SolaceExtendedBindingProperties.class})
    @ComponentScan(excludeFilters = {
            @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = {SolaceMessageChannelBinderConfiguration.class})})
    public class SolaceMessageChannelBinderTracingConfiguration {
    
        private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageChannelBinderTracingConfiguration.class);
    
        private final SpringJCSMPFactory springJCSMPFactory;
        private final SolaceExtendedBindingProperties solaceExtendedBindingProperties;
    
        private JCSMPSession jcsmpSession;
    
        public SolaceMessageChannelBinderTracingConfiguration(
                final SpringJCSMPFactory springJCSMPFactory,
                final SolaceExtendedBindingProperties solaceExtendedBindingProperties) {
            this.springJCSMPFactory = springJCSMPFactory;
            this.solaceExtendedBindingProperties = solaceExtendedBindingProperties;
        }
    
        @PostConstruct
        private void initSession() throws JCSMPException {
            jcsmpSession = springJCSMPFactory.createSession();
            LOG.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName()));
            jcsmpSession.connect();
        }
    
        @Bean
        JCSMPSession jcsmpSession() {
            return jcsmpSession;
        }
    
        @Bean
        @Primary
        public SolaceMessageChannelBinder solaceMessageChannelBinder(
                final SolaceOutboundMessageTracingAspect outputMessageTracingAspect,
                final SolaceInboundMessageTracingAspect inboundMessageTracingAspect) {
            final SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, provisioningProvider()) {
                @Override
                protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<SolaceProducerProperties> producerProperties,
                                                                      MessageChannel errorChannel) {
                    final MessageHandler messageHandler = super.createProducerMessageHandler(destination, producerProperties, errorChannel);
                    return ProxyUtil.createProxy(messageHandler, outputMessageTracingAspect);
                }
    
                @Override
                protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<SolaceConsumerProperties> properties) {
                    final MessageProducer messageProducer = super.createConsumerEndpoint(destination, group, properties);
                    return ProxyUtil.createProxy(messageProducer, inboundMessageTracingAspect);
                }
            };
            binder.setExtendedBindingProperties(solaceExtendedBindingProperties);
            return binder;
        }
    
        @Bean
        SolaceQueueProvisioner provisioningProvider() {
            return new SolaceQueueProvisioner(jcsmpSession);
        }
    }
    

    Do you have any suggestions how to do it right?

    Thanks and best regards
    Mike