Writing to Solace with Spark

harry
harry Member Posts: 12

i have a spark dataframe and i want to write it to solace i.e. i want to send the rows of dataframe as events to solace(publishing basically) in a distributed way . like spark to kafka.

Comments

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 579 admin

    Hey @harry. I did some searching and found this, might it help? https://github.com/jksinghpro/spark-jms

    My colleague @Ken Overton did a Spark-JMS integration, but on the other side: https://github.com/koverton/spark-jms-connector I'm wondering if he might have any insight for taking data out of Spark and posting on Solace.

  • Pankaj Arora
    Pankaj Arora Member Posts: 2

    @harry I had created a Structured Streaming with Datasource V2 https://github.com/pankajsa/solace-spark to connect Solace & Spark. The current code has implemented the reader interfaces and you can extend it by implementing the writer interface.

  • harry
    harry Member Posts: 12

    @Aaron @Pankaj Arora @Ken Overton thanks a lot guys , let me check out those links, although i wrote a fair share of implementation ==> for each spark partition by making a connection object , a message producer and then iterating over the rows of spark dataFrame for that partition and sending them to messageProducer.send() of jms api and my solace on docker received those events which i sent in distributed way.
    although i need to figure out some cases and improve efficiency but this has been a big boost . Will get back shortly to you guys.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 579 admin
    edited May 2020 #5

    Awesome @harry . Looking forward to seeing the results!

    A couple thoughts... I don't know Spark, but I do know that creating new Connections and Producers is kind of an "expensive" operation (vs. sending a single message is "cheap"), so hopefully those objects are somewhat long-lived, and you're not creating and destroying Connections constantly... more efficient for the connection to remain up as long as possible, and just send the rows as messages as you iterate over. I think that sounds like what you're doing, but just wanted to be clear.

    Are you sending to a dynamic topic? E.g. pulling out some piece of data from the row (id?) and inserting that into the topic that you're publishing on? E.g. spark/data/$id/update or something..?

  • harry
    harry Member Posts: 12

    hey @Aaron , yeah the connection and producer is created per partition and closed per partition, so it has the upper limit of number of spark partitions which we can control, which is generally not much and i can introduce some throttling here. and to answer your question , yes! i am iterating over rows per partition and sending the rows as message as i iterate over, and no the topic is not dynamic in my case , so i think we are good. Couple of questions?
    Q1) Does solace (and i am assuming it does) have a connection pool and limit to how many connections any client can make with solace using the jms Api's?
    Q2) Since spark can process large amount of data as it is distributed over several nodes , let's say i read huge data and try to send it over solace , i don't want to bombard solace's queue's with my messages/events. how does solace control it. Like it sotres the incoming events in a buffer? and then pass on to broker ? how does it handle very huge number of events coming to it in a very short span, what can a client do/configure to avoid this?
    thanks.

  • harry
    harry Member Posts: 12
    edited May 2020 #7

    @Ken Overton i looked at the link, i think we both are doing the same thing when it comes to the CORE code with map partitions and iterating over rows. i have couple of issues need to be figured out
    Q1) how would you ensure an event at time t0 ( breach happened) will arrive at solace before t0 +delta( nothing to worry , all clear for breach, system error) when sending rows from a static data frame to solace.
    Q2) the same thing i mentioned above as to how to control flow so as not to bombard solace with huge amount of messages from dataframe as (the data is distributed and it can be millions of rows or even more).

    For question 1 , what if i assign a (sequence number +time) as id and do a logical partition by object id (so that same type of events falls in same partition and arrive almost immediately ) but this might lead to man y partitions , i am not sure?
    Any insights bud?
    @Pankaj Arora @Aaron

  • harry
    harry Member Posts: 12

    @Aaron @Pankaj Arora @Ken Overton @marc Thanks for your help guys. I wrote the connector, it is working fine for months. Wrote a blog on the same. Do check it out whenever you have time.
    https://medium.com/@harrysingh.nitj/spark-solace-connector-writing-to-solace-in-a-distributed-way-using-spark-3e5ff73cee04

  • [Deleted User]
    [Deleted User] Posts: 0

    Hi Harry, I'm embarrassed to say I've only noticed this thread just now. Responding now before looking it over because I know what happens wh

    Will have a look at your connector, as mine was very basic and the gaps/questions you raise are all spot on. Not sure when I'll have enough time to give it my full attention but definitely will engage and give you my thoughts. Cheers!

  • harry
    harry Member Posts: 12

    Hey Kov, no problem, your link was help enough at that point.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 954 admin

    @harry said:
    @Aaron @Pankaj Arora @Ken Overton @marc Thanks for your help guys. I wrote the connector, it is working fine for months. Wrote a blog on the same. Do check it out whenever you have time.
    https://medium.com/@harrysingh.nitj/spark-solace-connector-writing-to-solace-in-a-distributed-way-using-spark-3e5ff73cee04

    Wow that's awesome @harry! thanks for sharing!

  • In my case, I was interested in making a fully-agnostic JMS binding (all the existing ones had proprietary bits in them). If I preferred something to integrate more tightly, improve performance, I'd code it to the native Solace API. JMS persistence is more appropriate to point-to-point queueing, whereas Spark's windowing functions can leverage asynchronous streaming acknowledgments that are not strictly JMS compliant but are nonetheless persistence-backed. If we look at a native JCSMP version of the binding, it opens more of these possibilities for improved performance.