🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
XMLMessageProducer - java.io.NotSerializableException
Hi
I am new to Solace. I have been trying to use Solace with Spark cluster with DStream.
My sample code for producer looks like this
private XMLMessageProducer managerProducer; // In my init method, I am creating the instance of managerProducer like shown below managerProducer = session.getMessageProducer(new JCSMPStreamingPublishEventHandler() { @Override public void handleError(String messageID, JCSMPException cause, long timestamp) { logger.error("error connecting to solace: {}, timestamp {} and error code: {}", messageID, timestamp, cause); } @Override public void responseReceived(String messageID) { logger.info("Response received from solace: {}", messageID); } });
When I try to send message using producer, I am getting below exception.
Is this by design or I am doing something wrong?
User class threw exception: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer Serialization stack: - object not serializable (class: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer, value: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer@2124aef4) - field (class: com.xxx.xxx.stream.xxxxx, name: managerProducer, type: interface com.solacesystems.jcsmp.XMLMessageProducer) - object (class com.xxx.xxx.stream.xxxxx, - com.xxxxxxxxxxxxxxxxxxx@2efbc1b7) - field (class: com.xxxxxxxxx$1, name: this$0, type: xxxxxxxxxxxxxxxxxxxxxxxx) - object (xxxxxxxxxxxxxxxxxxxxxxxxxxxxx, <function1>) - field (class: org.apache.spark.streaming.jms.BaseJmsReceiver, name: messageConverter, type: interface scala.Function1) - object (class org.apache.spark.streaming.jms.AsynchronousJmsReceiver, org.apache.spark.streaming.jms.AsynchronousJmsReceiver@3f6a56bd) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@759d95f0) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@759d95f0)) - writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData) - object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [ 0 checkpoint files ]) - writeObject data (class: org.apache.spark.streaming.dstream.DStream) - object (class org.apache.spark.streaming.dstream.PluggableInputDStream, org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 16) - field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;) - object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef)) - writeObject data (class: org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@737d20bb) - field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph) - object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@6c6554d5) at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:531) at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:576) at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:575) at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)
Comments
-
Hi @naveenv712 , welcome to the Community!
So, I don't know much about DStreams... but if you're planning to write some data into Solace messages for publishing, you'll need to be able to serialize that content to attach it to the Solace payload.
But, your sample code you posted is only the message publisher callback methods with some logging? So I don't know what you are trying to do. Have you seen this? https://github.com/SolaceSamples/solace-samples-java-jcsmp
And from your stack trace, it looks like you are trying to send the Solace producer object as a Spark message?? Is that correct? That seems backwards. Check out the samples (maybe this one for Guaranteed Messages: https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/patterns/GuaranteedPublisher.java ).
Finally, can you explain a bit more about your use case... how exactly you're trying to integrate these? E.g. Spark --> Solace, or Solace --> Spark? What type of data, etc..? There are potentially some off-the-shelf solutions so you don't have to go coding things manually.
0 -
Thanks @Aaron.
I should have been more clear. This is what I am doing.
MQ --> Stream of Message (Spark Streaming) --> published to Solace queue <-- Another Solace consumer reads it.
public class SolaceConnectionManager { private static final Logger logger = LoggerFactory.getLogger(SolaceConnectionManager.class); private JCSMPSession session; public static final String HOST = "tcp://someHost"; public static final String VPN_NAME = ""someVPN; public static final String USERNAME = "someUser"; public static final String PASSWORD = "somePassword"; public void connectSolace(Properties properties, String encKey) throws Exception { //establish solace session JCSMPProperties solaceproperties = new JCSMPProperties(); solaceproperties.setProperty(JCSMPProperties.HOST, properties.get("host")); solaceproperties.setProperty(JCSMPProperties.VPN_NAME, properties.get("vpn")); solaceproperties.setProperty(JCSMPProperties.USERNAME, properties.get("username")); solaceproperties.setProperty(JCSMPProperties.PASSWORD, Decrypter.decrypt((String)properties.get("password"),encKey)); //properties.setProperty(JCSMPProperties.PUB_ACK_TIME, 10000); session = JCSMPFactory.onlyInstance().createSession(solaceproperties); if (((String)properties.get("host")).contains(",")) { //ha - automatic reconnects JCSMPChannelProperties channelProperties = (JCSMPChannelProperties)solaceproperties.getProperty(JCSMPProperties.CLIENT_CHANNEL_PROPERTIES); channelProperties.setConnectRetries(3); channelProperties.setReconnectRetries(3); channelProperties.setReconnectRetryWaitInMillis(2000); channelProperties.setConnectRetriesPerHost(3); //Solace recommendeds 3 or more } } public void disconnect() { if(!session.isClosed()) { session.closeSession(); } } } // Method in another class used for sending Message public void sendMessageToSolace(Row row, Properties properties, String encKey) throws Exception { SolaceConnectionManager solaceConnectionManager = new SolaceConnectionManager(); solaceConnectionManager.connectSolace(properties,encKey); String basePublishTopic = properties.getProperty("basePublishTopic"); Topic topic = JCSMPFactory.onlyInstance().createTopic(basePublishTopic); XMLMessageProducer producer = solaceConnectionManager.createProducer(); BytesXMLMessage payloadMessage = generateSolaceMessage(DeliveryMode.PERSISTENT,row); sendMessageSolace(producer, payloadMessage, topic, row); producer.close(); solaceConnectionManager.disconnect(); }
What I wanted to do:
1: I want to AVOID following things for EACH message being sent.solaceConnectionManager.connectSolace(properties,encKey);
,Topic topic = JCSMPFactory.onlyInstance().createTopic(basePublishTopic);
XMLMessageProducer producer = solaceConnectionManager.createProducer();
producer.close();
solaceConnectionManager.disconnect();
2: I tried to keep
XMLMessageProducer producer =
as class variable and I got the exception NotSerializableException.I want to AVOID creating Producer, Topic etc for each message sent. Any suggestions?
0 -
Can anybody suggest, please?
0 -
Hey @naveenv712 ... ok, yes you definitely don't want to be creating the connection every time you send a message, that is a terrible idea. Same with the producer object, that should just be during initialization. The Topic object is very lightweight, it's essentially just a String, so you can (and should!) create that for each new message.. especially if you want to do cool things like dynamic topics. E.g.
logs/<host>/<PID>/<severity>/<event>
or something like that.I don't know what this method does
sendMessageSolace(producer, payloadMessage, topic, row);
but it probably looks similar to other logging framework integrations I've done. I wouldn't keep the Session or XmlProducer as class variables in case you have multiple instances running, make them instance variables.Check out these for maybe some inspiration?
https://github.com/SolaceLabs/logstash-output-solace/blob/main/src/main/java/com/solace/aa/logstash/output/Solace.java
https://github.com/aaron-613/log4j-solace-appender/blob/master/src/main/java/com/solace/aaron/log4j/appender/SolaceManager.java0