XMLMessageProducer - java.io.NotSerializableException

Hi

I am new to Solace. I have been trying to use Solace with Spark cluster with DStream.

My sample code for producer looks like this

private XMLMessageProducer managerProducer;

   // In my init method, I am creating the instance of managerProducer like shown below
       managerProducer = session.getMessageProducer(new JCSMPStreamingPublishEventHandler() {
            @Override
            public void handleError(String messageID, JCSMPException cause, long timestamp) {
                logger.error("error connecting to solace: {}, timestamp {} and error code: {}", messageID, timestamp, cause);
            }
            @Override
            public void responseReceived(String messageID) {
                logger.info("Response received from solace: {}", messageID);
            }
        }); 

When I try to send message using producer, I am getting below exception.

Is this by design or I am doing something wrong?

User class threw exception: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer
Serialization stack:
- object not serializable (class: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer, value: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer@2124aef4)
- field (class: com.xxx.xxx.stream.xxxxx, name: managerProducer, type: interface com.solacesystems.jcsmp.XMLMessageProducer)
- object (class com.xxx.xxx.stream.xxxxx, 
- com.xxxxxxxxxxxxxxxxxxx@2efbc1b7)
- field (class: com.xxxxxxxxx$1, name: this$0, type: xxxxxxxxxxxxxxxxxxxxxxxx)
- object (xxxxxxxxxxxxxxxxxxxxxxxxxxxxx, <function1>)
- field (class: org.apache.spark.streaming.jms.BaseJmsReceiver, name: messageConverter, type: interface scala.Function1)
- object (class org.apache.spark.streaming.jms.AsynchronousJmsReceiver, org.apache.spark.streaming.jms.AsynchronousJmsReceiver@3f6a56bd)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@759d95f0)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@759d95f0))
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.PluggableInputDStream, org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef))
- writeObject data (class: org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@737d20bb)
- field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@6c6554d5)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:531)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:576)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:575)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)

Comments

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 595 admin

    Hi @naveenv712 , welcome to the Community!

    So, I don't know much about DStreams... but if you're planning to write some data into Solace messages for publishing, you'll need to be able to serialize that content to attach it to the Solace payload.

    But, your sample code you posted is only the message publisher callback methods with some logging? So I don't know what you are trying to do. Have you seen this? https://github.com/SolaceSamples/solace-samples-java-jcsmp

    And from your stack trace, it looks like you are trying to send the Solace producer object as a Spark message?? Is that correct? That seems backwards. Check out the samples (maybe this one for Guaranteed Messages: https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/patterns/GuaranteedPublisher.java ).

    Finally, can you explain a bit more about your use case... how exactly you're trying to integrate these? E.g. Spark --> Solace, or Solace --> Spark? What type of data, etc..? There are potentially some off-the-shelf solutions so you don't have to go coding things manually.

  • naveenv712
    naveenv712 Member Posts: 4

    Thanks @Aaron.

    I should have been more clear. This is what I am doing.

    MQ --> Stream of Message (Spark Streaming) --> published to Solace queue <-- Another Solace consumer reads it.

    public class SolaceConnectionManager {
        private static final Logger logger = LoggerFactory.getLogger(SolaceConnectionManager.class);
        private JCSMPSession session;
        public static final String HOST = "tcp://someHost";
        public static final String VPN_NAME = ""someVPN;
        public static final String USERNAME = "someUser";
        public static final String PASSWORD = "somePassword";
    
        public void connectSolace(Properties properties, String encKey) throws Exception {
            //establish solace session
            JCSMPProperties solaceproperties = new JCSMPProperties();
            solaceproperties.setProperty(JCSMPProperties.HOST, properties.get("host"));
            solaceproperties.setProperty(JCSMPProperties.VPN_NAME, properties.get("vpn"));
            solaceproperties.setProperty(JCSMPProperties.USERNAME, properties.get("username"));
            solaceproperties.setProperty(JCSMPProperties.PASSWORD, Decrypter.decrypt((String)properties.get("password"),encKey));
            //properties.setProperty(JCSMPProperties.PUB_ACK_TIME, 10000);
            session = JCSMPFactory.onlyInstance().createSession(solaceproperties);
    
            if (((String)properties.get("host")).contains(",")) {
                //ha - automatic reconnects
                JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)solaceproperties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES);
                channelProperties.setConnectRetries(3);
                channelProperties.setReconnectRetries(3);
                channelProperties.setReconnectRetryWaitInMillis(2000);
                channelProperties.setConnectRetriesPerHost(3); //Solace recommendeds 3 or more
            }
        }
        public void disconnect() {
            if(!session.isClosed()) {
                session.closeSession();
            }
        }
    }
    
    
    // Method in another class used for sending Message
    public void sendMessageToSolace(Row row, Properties properties, String encKey) throws Exception {
            SolaceConnectionManager solaceConnectionManager = new SolaceConnectionManager();
            solaceConnectionManager.connectSolace(properties,encKey);
            String basePublishTopic = properties.getProperty("basePublishTopic");
            Topic topic = JCSMPFactory.onlyInstance().createTopic(basePublishTopic);
            XMLMessageProducer producer = solaceConnectionManager.createProducer();
            BytesXMLMessage payloadMessage = generateSolaceMessage(DeliveryMode.PERSISTENT,row);
            sendMessageSolace(producer, payloadMessage, topic, row);
            producer.close();
            solaceConnectionManager.disconnect();
        }
    
    

    What I wanted to do:
    1: I want to AVOID following things for EACH message being sent.
    solaceConnectionManager.connectSolace(properties,encKey); ,
    Topic topic = JCSMPFactory.onlyInstance().createTopic(basePublishTopic);
    XMLMessageProducer producer = solaceConnectionManager.createProducer();
    producer.close();
    solaceConnectionManager.disconnect();

    2: I tried to keep XMLMessageProducer producer = as class variable and I got the exception NotSerializableException.

    I want to AVOID creating Producer, Topic etc for each message sent. Any suggestions?

  • naveenv712
    naveenv712 Member Posts: 4

    Can anybody suggest, please?

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 595 admin

    Hey @naveenv712 ... ok, yes you definitely don't want to be creating the connection every time you send a message, that is a terrible idea. Same with the producer object, that should just be during initialization. The Topic object is very lightweight, it's essentially just a String, so you can (and should!) create that for each new message.. especially if you want to do cool things like dynamic topics. E.g. logs/<host>/<PID>/<severity>/<event> or something like that.

    I don't know what this method does sendMessageSolace(producer, payloadMessage, topic, row); but it probably looks similar to other logging framework integrations I've done. I wouldn't keep the Session or XmlProducer as class variables in case you have multiple instances running, make them instance variables.

    Check out these for maybe some inspiration?
    https://github.com/SolaceLabs/logstash-output-solace/blob/main/src/main/java/com/solace/aa/logstash/output/Solace.java
    https://github.com/aaron-613/log4j-solace-appender/blob/master/src/main/java/com/solace/aaron/log4j/appender/SolaceManager.java