Unable to reuse sessions in Springboot while producing persistent messages to SOLACE SMF
We have been trying to create a Bean that creates the JCSMPSession. In the class we are using the session object do session.connect(). We are using asynchronous mode to publish the data. It seems the code is not thread safe in this case and we are not able to receive the acknowledgement.
Can some one share a sample springboot app where sessions are been reused.
Comments
-
Hi @AvikMazumdar,
Welcome to the community! I think this should be do able. Any chance you can share the code you're working with? I don't know of a sample springboot app off the top of my head that shows this but maybe we should create one
I'm assuming you're using a
XMLMessageProducer
to publish events which should have aJCSMPStreamingPublishEventHandler
that receives the callback?This can definitely be confusing so I'd recommend making a thorough pass of the XMLMessageProducer javadocs as they cover Thread Safety, Message Ownership, and other considerations which likely apply here.
-Marc
0 -
Hi,
We are using XMLMessageProducer. We have a configuration class where we created a bean that creates an object of the XMLMessageProducer. We have a separate class for handling the acknowledgements (copied from the samples).0 -
Can you please share a code where spring-boot uses beans and not create a connection for each message. The requirement is to have a code that runs when a request comes in, may be a rest call to the service and then the service sends data to solace topic. Our case is listning to jms queues of weblogic, and whenever the data is published to the queue, our jmslistners pull the message and send the same to solace topic.
0 -
Please share your email ID and I can share the code.
0 -
here is the application.java
package com.isom.solaceproducer;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableAsync;@SpringBootApplication
@EnableAsync
public class SolaceProducerApplication {public static void main(String[] args) { SpringApplication.run(SolaceProducerApplication.class, args); }
}
This class receives messages from JMS and calls a method on another class to send data:
0 -
here are the mail classes of the code
1 -
@marc Will wait for your reply.
1 -
Hi @AvikMazumdar,
Thanks for the code and the further info. I know you specified JCSMP (which is our own proprietary Java API) but for awareness we also have a JMS API which you could use if you're already familiar with JMS since you're using it with Weblogic. Spring obviously also has a bunch of support already built up around JMS.But, to do this with JCSMP I think you want something like below (as well as your
JCSMPStreamingPublishCorrelatingEventHandler
impl):
YourAppConfig
can define theJCSMPSession
andXMLMessageProducer
as beans. By default Spring will make these beans a singleton scope which will allow you to re-use the same producer and session for each message published (this also means you are not making a new tcp connection out to solace for each message).package com.example.demo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import com.solacesystems.jcsmp.JCSMPException; import com.solacesystems.jcsmp.JCSMPSession; import com.solacesystems.jcsmp.SpringJCSMPFactory; import com.solacesystems.jcsmp.XMLMessageProducer; @Configuration public class AppConfig { @Autowired private SpringJCSMPFactory solaceFactory; @Bean protected JCSMPSession jcsmpSession() throws JCSMPException { JCSMPSession session = solaceFactory.createSession(); session.connect(); return session; } @Bean public XMLMessageProducer xmlMessageProducer(JCSMPSession session) throws JCSMPException{ DemoPublishEventHandler pubEventHandler = new DemoPublishEventHandler(); final XMLMessageProducer prod = session.getMessageProducer(pubEventHandler); return prod; } }
Then in your actual code just go ahead and autowire the
XMLMessageProducer
and you should be good to go! Here is a sample app that can be triggered via a curl request such ascurl -H "Content-Type: application/json" -X POST -d '{"topic":"foo/bar/1","payload":"100"}' http://localhost:8080
. The code will publish the payload to the topic defined in the body of the post. You can login to your PS+ Manager and see that there is one connection which remains throughout the app's lifetime and under stats see that "Client Data Messages (msgs)" goes up each time you send a message.package com.example.demo; import static org.springframework.web.bind.annotation.RequestMethod.POST; import java.util.Map; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.http.HttpHeaders; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.RequestBody; import org.springframework.web.bind.annotation.RequestHeader; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.ResponseStatus; import org.springframework.web.bind.annotation.RestController; import com.fasterxml.jackson.databind.ObjectMapper; import com.solacesystems.jcsmp.DeliveryMode; import com.solacesystems.jcsmp.JCSMPFactory; import com.solacesystems.jcsmp.TextMessage; import com.solacesystems.jcsmp.Topic; import com.solacesystems.jcsmp.XMLMessageProducer; @RestController @SpringBootApplication public class SpringBootJcsmpSampleApplication { @Autowired private ObjectMapper jsonMapper; @Autowired private XMLMessageProducer prod; public static void main(String[] args) { SpringApplication.run(SpringBootJcsmpSampleApplication.class, args); } @SuppressWarnings("unchecked") @RequestMapping(path = "/", method = POST, consumes = "*/*") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception { System.out.println("Request Received"); Map<String, String> payload = jsonMapper.readValue(body, Map.class); String topicName = payload.get("topic"); String msgPayload= payload.get("payload"); final Topic topic = JCSMPFactory.onlyInstance() .createTopic(topicName); final TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class); msg.setDeliveryMode(DeliveryMode.PERSISTENT); msg.setCorrelationKey(topic); System.out.println("Payload is " + msgPayload); msg.setText(msgPayload); System.out.println("Sending Message to " + topic); prod.send(msg, topic); } }
Note a few things:
1. In the sample above I create a new message from the JCSMPFactory for each request. You can actually re-use the message each time and usemsg.reset()
to clear it out.
2. TheJCSMPStreamingPublishCorrelatingEventHandler
receives the async callback on its own thread so the approach you're taking with storing the msg info makes sense; then when you receive the callback you can mark the message as complete and ACK it back to weblogic (or whatever you need to do). That said, there is no need to block after sending each message unless you absolutely want to wait and handle one message at a time. Blocking after each send will obviously slow your app down so it's up to you and your required business logic.Hope that helps!
0 -
Hi @Shaz,
If you're looking at using our Spring Boot Starter for JCSMP then there is an example that both sends and receives here: https://github.com/SolaceProducts/solace-spring-boot/tree/master/solace-spring-boot-samples/solace-java-sample-app
Note that if you don't need to use JCSMP for any specific reason I usually see devs using Spring go with our Spring Boot JMS Starter instead since Spring offers a lot of support for JMS which can be leveraged.
If you have any follow-up questions please go ahead and create a fresh discussion vs. continuing to add to this old thread.
Hope that helps!
-Marc
0