Unable to reuse sessions in Springboot while producing persistent messages to SOLACE SMF

We have been trying to create a Bean that creates the JCSMPSession. In the class we are using the session object do session.connect(). We are using asynchronous mode to publish the data. It seems the code is not thread safe in this case and we are not able to receive the acknowledgement.
Can some one share a sample springboot app where sessions are been reused.

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 584 admin

    Hi @AvikMazumdar,

    Welcome to the community! I think this should be do able. Any chance you can share the code you're working with? I don't know of a sample springboot app off the top of my head that shows this but maybe we should create one :)

    I'm assuming you're using a XMLMessageProducer to publish events which should have a JCSMPStreamingPublishEventHandler that receives the callback?

    This can definitely be confusing so I'd recommend making a thorough pass of the XMLMessageProducer javadocs as they cover Thread Safety, Message Ownership, and other considerations which likely apply here.

    -Marc

  • AvikMazumdar
    AvikMazumdar Member Posts: 8

    Hi,
    We are using XMLMessageProducer. We have a configuration class where we created a bean that creates an object of the XMLMessageProducer. We have a separate class for handling the acknowledgements (copied from the samples).

  • AvikMazumdar
    AvikMazumdar Member Posts: 8

    Can you please share a code where spring-boot uses beans and not create a connection for each message. The requirement is to have a code that runs when a request comes in, may be a rest call to the service and then the service sends data to solace topic. Our case is listning to jms queues of weblogic, and whenever the data is published to the queue, our jmslistners pull the message and send the same to solace topic.

  • AvikMazumdar
    AvikMazumdar Member Posts: 8

    Please share your email ID and I can share the code.

  • AvikMazumdar
    AvikMazumdar Member Posts: 8

    here is the application.java
    package com.isom.solaceproducer;

    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.scheduling.annotation.EnableAsync;

    @SpringBootApplication
    @EnableAsync
    public class SolaceProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SolaceProducerApplication.class, args);
    }
    

    }

    This class receives messages from JMS and calls a method on another class to send data:

  • AvikMazumdar
    AvikMazumdar Member Posts: 8
    edited December 2020 #8

    @marc Will wait for your reply.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 584 admin
    edited December 2020 #9

    Hi @AvikMazumdar,
    Thanks for the code and the further info. I know you specified JCSMP (which is our own proprietary Java API) but for awareness we also have a JMS API which you could use if you're already familiar with JMS since you're using it with Weblogic. Spring obviously also has a bunch of support already built up around JMS.

    But, to do this with JCSMP I think you want something like below (as well as your JCSMPStreamingPublishCorrelatingEventHandler impl):
    Your AppConfig can define the JCSMPSession and XMLMessageProducer as beans. By default Spring will make these beans a singleton scope which will allow you to re-use the same producer and session for each message published (this also means you are not making a new tcp connection out to solace for each message).

    package com.example.demo;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.context.annotation.Bean;
    import org.springframework.context.annotation.Configuration;
    
    import com.solacesystems.jcsmp.JCSMPException;
    import com.solacesystems.jcsmp.JCSMPSession;
    import com.solacesystems.jcsmp.SpringJCSMPFactory;
    import com.solacesystems.jcsmp.XMLMessageProducer;
    
    
    @Configuration
    public class AppConfig {
    
        @Autowired
        private SpringJCSMPFactory solaceFactory; 
    
        @Bean
        protected JCSMPSession jcsmpSession() throws JCSMPException {
            JCSMPSession session = solaceFactory.createSession();
            session.connect();
            return session;
        }
    
        @Bean
        public XMLMessageProducer xmlMessageProducer(JCSMPSession session) throws JCSMPException{
            DemoPublishEventHandler pubEventHandler = new DemoPublishEventHandler();
            final XMLMessageProducer prod = session.getMessageProducer(pubEventHandler);
            return prod;
         }
    
    }
    

    Then in your actual code just go ahead and autowire the XMLMessageProducer and you should be good to go! Here is a sample app that can be triggered via a curl request such as curl -H "Content-Type: application/json" -X POST -d '{"topic":"foo/bar/1","payload":"100"}' http://localhost:8080. The code will publish the payload to the topic defined in the body of the post. You can login to your PS+ Manager and see that there is one connection which remains throughout the app's lifetime and under stats see that "Client Data Messages (msgs)" goes up each time you send a message.

    package com.example.demo;
    
    import static org.springframework.web.bind.annotation.RequestMethod.POST;
    
    import java.util.Map;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.http.HttpHeaders;
    import org.springframework.http.HttpStatus;
    import org.springframework.web.bind.annotation.RequestBody;
    import org.springframework.web.bind.annotation.RequestHeader;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.ResponseStatus;
    import org.springframework.web.bind.annotation.RestController;
    
    import com.fasterxml.jackson.databind.ObjectMapper;
    import com.solacesystems.jcsmp.DeliveryMode;
    import com.solacesystems.jcsmp.JCSMPFactory;
    import com.solacesystems.jcsmp.TextMessage;
    import com.solacesystems.jcsmp.Topic;
    import com.solacesystems.jcsmp.XMLMessageProducer;
    
    @RestController
    @SpringBootApplication
    public class SpringBootJcsmpSampleApplication {
    
        @Autowired
        private ObjectMapper jsonMapper;
    
        @Autowired
        private XMLMessageProducer prod;
    
        public static void main(String[] args) {
            SpringApplication.run(SpringBootJcsmpSampleApplication.class, args);
        }
    
        @SuppressWarnings("unchecked")
        @RequestMapping(path = "/", method = POST, consumes = "*/*")
        @ResponseStatus(HttpStatus.ACCEPTED)
        public void handleRequest(@RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) throws Exception {
            System.out.println("Request Received");
            Map<String, String> payload = jsonMapper.readValue(body, Map.class);
            String topicName = payload.get("topic");
            String msgPayload= payload.get("payload");
    
            final Topic topic = JCSMPFactory.onlyInstance()
                    .createTopic(topicName);
    
            final TextMessage msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
            msg.setDeliveryMode(DeliveryMode.PERSISTENT);
            msg.setCorrelationKey(topic);
    
            System.out.println("Payload is " + msgPayload);
            msg.setText(msgPayload);
            System.out.println("Sending Message to " + topic);
            prod.send(msg, topic);
        }
    
    
    }
    

    Note a few things:
    1. In the sample above I create a new message from the JCSMPFactory for each request. You can actually re-use the message each time and use msg.reset() to clear it out.
    2. The JCSMPStreamingPublishCorrelatingEventHandler receives the async callback on its own thread so the approach you're taking with storing the msg info makes sense; then when you receive the callback you can mark the message as complete and ACK it back to weblogic (or whatever you need to do). That said, there is no need to block after sending each message unless you absolutely want to wait and handle one message at a time. Blocking after each send will obviously slow your app down so it's up to you and your required business logic.

    Hope that helps!