Hi
I am new to Solace. I have been trying to use Solace with Spark cluster with DStream.
My sample code for producer looks like this
private XMLMessageProducer managerProducer;
// In my init method, I am creating the instance of managerProducer like shown below
managerProducer = session.getMessageProducer(new JCSMPStreamingPublishEventHandler() {
@Override
public void handleError(String messageID, JCSMPException cause, long timestamp) {
logger.error("error connecting to solace: {}, timestamp {} and error code: {}", messageID, timestamp, cause);
}
@Override
public void responseReceived(String messageID) {
logger.info("Response received from solace: {}", messageID);
}
});
When I try to send message using producer, I am getting below exception.
Is this by design or I am doing something wrong?
User class threw exception: java.io.NotSerializableException: DStream checkpointing has been enabled but the DStreams with their functions are not serializable
com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer
Serialization stack:
- object not serializable (class: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer, value: com.solacesystems.jcsmp.impl.JCSMPXMLMessageProducer@2124aef4)
- field (class: com.xxx.xxx.stream.xxxxx, name: managerProducer, type: interface com.solacesystems.jcsmp.XMLMessageProducer)
- object (class com.xxx.xxx.stream.xxxxx,
- com.xxxxxxxxxxxxxxxxxxx@2efbc1b7)
- field (class: com.xxxxxxxxx$1, name: this$0, type: xxxxxxxxxxxxxxxxxxxxxxxx)
- object (xxxxxxxxxxxxxxxxxxxxxxxxxxxxx, <function1>)
- field (class: org.apache.spark.streaming.jms.BaseJmsReceiver, name: messageConverter, type: interface scala.Function1)
- object (class org.apache.spark.streaming.jms.AsynchronousJmsReceiver, org.apache.spark.streaming.jms.AsynchronousJmsReceiver@3f6a56bd)
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.ForEachDStream, org.apache.spark.streaming.dstream.ForEachDStream@759d95f0)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.ForEachDStream@759d95f0))
- writeObject data (class: org.apache.spark.streaming.dstream.DStreamCheckpointData)
- object (class org.apache.spark.streaming.dstream.DStreamCheckpointData, [
0 checkpoint files
])
- writeObject data (class: org.apache.spark.streaming.dstream.DStream)
- object (class org.apache.spark.streaming.dstream.PluggableInputDStream, org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef)
- element of array (index: 0)
- array (class [Ljava.lang.Object;, size 16)
- field (class: scala.collection.mutable.ArrayBuffer, name: array, type: class [Ljava.lang.Object;)
- object (class scala.collection.mutable.ArrayBuffer, ArrayBuffer(org.apache.spark.streaming.dstream.PluggableInputDStream@2e67b8ef))
- writeObject data (class: org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.DStreamGraph, org.apache.spark.streaming.DStreamGraph@737d20bb)
- field (class: org.apache.spark.streaming.Checkpoint, name: graph, type: class org.apache.spark.streaming.DStreamGraph)
- object (class org.apache.spark.streaming.Checkpoint, org.apache.spark.streaming.Checkpoint@6c6554d5)
at org.apache.spark.streaming.StreamingContext.validate(StreamingContext.scala:531)
at org.apache.spark.streaming.StreamingContext.liftedTree1$1(StreamingContext.scala:576)
at org.apache.spark.streaming.StreamingContext.start(StreamingContext.scala:575)
at org.apache.spark.streaming.api.java.JavaStreamingContext.start(JavaStreamingContext.scala:556)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.spark.deploy.yarn.ApplicationMaster$$anon$2.run(ApplicationMaster.scala:685)