Solace Spring cloud stream and opentracing
Hi there
I am trying to introduce tracing in my project, for that I defined the following dependency:
io.opentracing.contrib
opentracing-spring-jaeger-cloud-starter
3.1.2
There are starters for kafka, rabbitmq...
But unfortunately none for Solace.
Now I want to implement it on my own.
What would be the best hook for a ReceiveTracingInterceptor and a SendTracingAspect?
My spring-cloud-starter-stream-solace version is 2.1.1
Comments
-
Hi @Mike13,
This sounds awesome! I love the idea of using open tracing as I think something like that is definitely necessary to really understand where your events flow at runtime. That said, I haven't gotten a chance to dig into it myself. Can you share or point me to more info on what's required for a "hook" to be used? I think that will give me enough info to be dangerous!Thanks!
0 -
Hi @marc
Here some information what happens when I start it without doing anything but adding the dependency to jaeger:
The io.opentracing.contrib.spring.integration.messaging.OpenTracingChannelInterceptor#OpenTracingChannelInterceptor from https://github.com/opentracing-contrib/java-spring-messaging/tree/master/opentracing-spring-messaging enriches the Message with the following headers:these leads to problems in com.solace.spring.cloud.stream.binder.util.XMLMessageMapper#addSDTMapObject
Because the ThreadLocalScope is not an instance of Serializable..Here you can find how they do it for rabbitmq:
https://github.com/opentracing-contrib/java-spring-rabbitmq/blob/master/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqSendTracingAspect.java
https://github.com/opentracing-contrib/java-spring-rabbitmq/blob/master/opentracing-spring-rabbitmq/src/main/java/io/opentracing/contrib/spring/rabbitmq/RabbitMqReceiveTracingInterceptor.java
or for jms:
https://github.com/opentracing-contrib/java-jms/tree/master/opentracing-jms-spring/src/main/java/io/opentracing/contrib/jms/springBest regards
Mike0 -
Hi @Mike13,
It's been a crazy week so just circling back to this. So you're indeed correct that right now the Solace Binder only supports headers which either implementSerializable
or are directly compatible with JCSMP'sSDTMap
. One of the reasons for this is that it is required by the Spring serializer that we use for our message headers (whose ObjectOutputStream also enforces an implementation of Serializable). We're looking at potentially adding an option (similar to what the rabbit binder does) to get around that in a future version of the binder but it would likely come at the cost of not being able to map the object back to it's original form on the consumer side like the rabbit binder.That said, do you think there is some way to wrap the non-serializable with a
Serializable
?As far as where to hook in I would suggest:
- Receiving Side: handleMessage
- Sending Side: producer.send
Hope that helps! I'm definitely interested in following where this goes so I hope you'll be able to create the solace starter as an open source project. If so please respond with the repo below so I can watch/contribute!
thanks!
0 -
More info that might be useful, under the covers our Spring Cloud Stream Binder uses JCSMP (our Java API).
There are a few JCSMP send methods which you can see documented here although it's just that one I linked to in the last post that the cloud stream binder uses. And on the receiving side you can take a synchronous polling approach (like our binder does) or register a listener that would receive an asynchronous callback in theonReceive
method documented here0 -
Hi @marc
I have finally found some time and could continue working on this project. I didn't manage to intercept InboundXMLMessageListener.handleMessage because the JCSMPInboundChannelAdapter.buildListener method is private. But I did this:@Configuration @EnableConfigurationProperties({SolaceExtendedBindingProperties.class}) @ComponentScan(excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = {SolaceMessageChannelBinderConfiguration.class})}) public class SolaceMessageChannelBinderTracingConfiguration { private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageChannelBinderTracingConfiguration.class); private final SpringJCSMPFactory springJCSMPFactory; private final SolaceExtendedBindingProperties solaceExtendedBindingProperties; private JCSMPSession jcsmpSession; public SolaceMessageChannelBinderTracingConfiguration( final SpringJCSMPFactory springJCSMPFactory, final SolaceExtendedBindingProperties solaceExtendedBindingProperties) { this.springJCSMPFactory = springJCSMPFactory; this.solaceExtendedBindingProperties = solaceExtendedBindingProperties; } @PostConstruct private void initSession() throws JCSMPException { jcsmpSession = springJCSMPFactory.createSession(); LOG.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName())); jcsmpSession.connect(); } @Bean SolaceMessageChannelBinder solaceMessageChannelBinder( final SolaceOutboundMessageTracingAspect outputMessageTracingAspect, final SolaceInboundMessageTracingAspect inboundMessageTracingAspect) { final SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, provisioningProvider()) { @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties producerProperties, MessageChannel errorChannel) { final MessageHandler messageHandler = super.createProducerMessageHandler(destination, producerProperties, errorChannel); return ProxyUtil.createProxy(messageHandler, outputMessageTracingAspect); } @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties properties) { final MessageProducer messageProducer = super.createConsumerEndpoint(destination, group, properties); return ProxyUtil.createProxy(messageProducer, inboundMessageTracingAspect); } }; binder.setExtendedBindingProperties(solaceExtendedBindingProperties); return binder; } @Bean SolaceQueueProvisioner provisioningProvider() { return new SolaceQueueProvisioner(jcsmpSession); } }
With the outboundMessageTracingAspect I intercept_ org.springframework.messaging.MessageHandler.handleMessage_ to decorate the message with tracing information.
With the inboundMessageTracingAspect I intercept org.springframework.integration.core.MessageProducer.setOutputChannel to adapt the MessageChannel. The adapter extracts the tracing information out of the message before delegating to the real messageChannel.
But now I have to dive into the opentracing topic to get the tracing implementation right.1 -
Awesome, thanks for sharing @Mike13. It looks like you're on the right track 🥳! Is the code on github so I can take a closer look at some point?
Also if you don't mind me asking how did you get around the non-serializable header issue?
0 -
Hi @marc
I just put the code on github.
The project https://github.com/MikeR13/esta-spring-cloud-stream-solace contains an example app for spring cloud stream with solace and I also put the code for the opentracing "project" there (https://github.com/MikeR13/esta-spring-cloud-stream-solace/tree/master/src/main/java/io/opentracing/contrib downwards). But please do not look too closely, it is a draft ;-)
Regarding the "non-serializable header issue":
There is still an open question on stackoverflow that I have asked https://stackoverflow.com/questions/63491625/opentracing-spring-cloud-stream-and-solace .1 -
Hi @marc
This approach does not work when there are several Binders in a project (with this approach only the default config is applied):@Configuration__ @ConditionalOnClass({XMLMessage.class, SolaceMessageChannelBinder.class}) @ConditionalOnBean(Tracer.class) @AutoConfigureAfter(TracerAutoConfiguration.class) @ConditionalOnProperty(name = "opentracing.spring.cloud.solace.enabled", havingValue = "true", matchIfMissing = true) @Import(SolaceMessageChannelBinderTracingConfiguration.class) public class SolaceAutoConfiguration { }
@Configuration @EnableConfigurationProperties({SolaceExtendedBindingProperties.class}) @ComponentScan(excludeFilters = { @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, value = {SolaceMessageChannelBinderConfiguration.class})}) public class SolaceMessageChannelBinderTracingConfiguration { private static final Logger LOG = LoggerFactory.getLogger(SolaceMessageChannelBinderTracingConfiguration.class); private final SpringJCSMPFactory springJCSMPFactory; private final SolaceExtendedBindingProperties solaceExtendedBindingProperties; private JCSMPSession jcsmpSession; public SolaceMessageChannelBinderTracingConfiguration( final SpringJCSMPFactory springJCSMPFactory, final SolaceExtendedBindingProperties solaceExtendedBindingProperties) { this.springJCSMPFactory = springJCSMPFactory; this.solaceExtendedBindingProperties = solaceExtendedBindingProperties; } @PostConstruct private void initSession() throws JCSMPException { jcsmpSession = springJCSMPFactory.createSession(); LOG.info(String.format("Connecting JCSMP session %s", jcsmpSession.getSessionName())); jcsmpSession.connect(); } @Bean JCSMPSession jcsmpSession() { return jcsmpSession; } @Bean @Primary public SolaceMessageChannelBinder solaceMessageChannelBinder( final SolaceOutboundMessageTracingAspect outputMessageTracingAspect, final SolaceInboundMessageTracingAspect inboundMessageTracingAspect) { final SolaceMessageChannelBinder binder = new SolaceMessageChannelBinder(jcsmpSession, provisioningProvider()) { @Override protected MessageHandler createProducerMessageHandler(ProducerDestination destination, ExtendedProducerProperties<SolaceProducerProperties> producerProperties, MessageChannel errorChannel) { final MessageHandler messageHandler = super.createProducerMessageHandler(destination, producerProperties, errorChannel); return ProxyUtil.createProxy(messageHandler, outputMessageTracingAspect); } @Override protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties<SolaceConsumerProperties> properties) { final MessageProducer messageProducer = super.createConsumerEndpoint(destination, group, properties); return ProxyUtil.createProxy(messageProducer, inboundMessageTracingAspect); } }; binder.setExtendedBindingProperties(solaceExtendedBindingProperties); return binder; } @Bean SolaceQueueProvisioner provisioningProvider() { return new SolaceQueueProvisioner(jcsmpSession); } }
Do you have any suggestions how to do it right?
Thanks and best regards
Mike0