BOOMI : Publish Events to PubSub+ with custom headers & Listen to events and get headers

Thomas Manson
Thomas Manson Member Posts: 19 ✭✭✭
edited August 2021 in Connectors & Integrations #1

I write this post to share how to do the following things in Boomi :

  • Publish a message to Solace with custom headers
  • Listen for message and get the headers, parsing the custom headers stored in "userProperties" header

Boomi is an iPaaS solution (other well known iPaaS : TIBCO Cloud Integration, Mulesoft)

You can read about Solace Headers here :

Post a message with customer header in BOOMI

First be sure to select the following connector to connect to Solace :

"Solace PubSub+ Platform - Partner connector"

This connector is developed by Solace.

Pass static key=value pairs

Since the version 220 published on the 10th of July you can pass static key value pairs as headers from the "Operation" :

The result is the following :
your message is published with the standard headers + static_header_key=static_header_value header.

In a future release, you'll also be able to pass dynamic data as well.

Pass dynamic key value pairs

As of now, to pass dynamic data, you need to

  • Use the "Set Property" Shape
  • Type : Document property
  • Source Type : connector
  • connectors : select your "Solace PubSub+ Platform - Partner connector" instance
  • Property : User Property

As a value, you need to pass a JSON object with key/value pair;
This JSON string object should be built in your Boomi process prior to using the "Set Property" shape, and use a dynamic value.

(on the screenshot, I'm using a static value)

Exemple of value :

{"foo":"bar","baz":"fubar"}

The result is the following :
your message is published with the standard headers + the foo=bar & baz=fubar headers.

Listen to Events & Get Headers in BOOMI

Connection

be sure to use
"Solace PubSub+ Platform - Partner connector"

Add a groovy 2.4 custom script 'Data Process Shape'

Add the following code :

import java.util.Properties;
import java.io.InputStream;
import groovy.json.JsonSlurper;
import com.boomi.execution.ExecutionUtil;

logger = ExecutionUtil.getBaseLogger();

for( int i = 0; i < dataContext.getDataCount(); i++ ) 
{
    InputStream is                   = dataContext.getStream(i);
    Properties  props                = dataContext.getProperties(i);
    Set<String> keys                 = props.stringPropertyNames();
    String      solaceUserProperties = null;

    for(key in keys)
    {
       if(key.endsWith(".userProperties"))
       {//Parsing JSON object representing userProperties and set it as individual DDP
            solaceUserProperties = props.getProperty(key);
            logger.info("Solace UserProperties found key='"+key+"', value='"+solaceUserProperties+"'");

            def jsonSlurper = new JsonSlurper();
            def object      = jsonSlurper.parseText(solaceUserProperties);

            Set<String> objectKeys = object.keySet();

            for(key2 in objectKeys)
            {
                logger.info("Solace Properties found key='"+key+"', value='"+object.get(key2)+"'");
                props.setProperty("document.dynamic.userdefined."+key2,object.get(key2));
            }
       }
       else
       {
           logger.fine("Solace Other Properties found key='"+key+"', value='"+props.getProperty(key)+"'");
           //ex: 'connector.track.solacetpp-VMLLNA-solace-prod.isElidingEligible', we just want the last part
           String subKey = key.substring(key.lastIndexOf(".")+1);
           props.setProperty("document.dynamic.userdefined.solace_"+subKey,props.getProperty(key));
       }
    }
    logger.info(props.toString());
    dataContext.storeStream(is, props);
}

This code will do the following :

  • Add all headers as Dynamic Document Property under the name "solace__HEADER_KEY_"
  • Parse userProperties json and add as Dynamic Document Property under the name "key"

Here are the list of standard headers:

key='connector.track.solacetpp-VMLLNA-solace-prod.senderTimestamp', value='null'
key='connector.track.solacetpp-VMLLNA-solace-prod.messageIDLong', value='4'
key='connector.track.solacetpp-VMLLNA-solace-prod.userProperties', value='{"static_header_key":"static_header_value_xyz","foo":"bar","baz":"fubar"}'
key='connector.track.solacetpp-VMLLNA-solace-prod.dataType', value='track'
key='connector.track.solacetpp-VMLLNA-solace-prod.sourceType', value='connector'
key='connector.track.solacetpp-VMLLNA-solace-prod.redelivered', value='false'
key='connector.track.solacetpp-VMLLNA-solace-prod.cos', value='0'
key='connector.track.solacetpp-VMLLNA-solace-prod.type', value='solacetpp-VMLLNA-solace-prod'
key='connector.track.solacetpp-VMLLNA-solace-prod.priority', value='4'
key='connector.track.solacetpp-VMLLNA-solace-prod.timeToLive', value='0'
key='connector.track.solacetpp-VMLLNA-solace-prod.isReplyMessage', value='false'
key='connector.track.solacetpp-VMLLNA-solace-prod.isDMQEligible', value='true'
key='connector.track.solacetpp-VMLLNA-solace-prod.deliveryMode', value='DIRECT'
key='connector.track.solacetpp-VMLLNA-solace-prod.discardIndication', value='false'
key='connector.track.solacetpp-VMLLNA-solace-prod.messageID', value='4'
key='connector.track.solacetpp-VMLLNA-solace-prod.receiveTimestamp', value='0'
key='connector.track.solacetpp-VMLLNA-solace-prod.destination', value='ride/request'
key='connector.track.solacetpp-VMLLNA-solace-prod.expiration', value='0'
key='connector.track.solacetpp-VMLLNA-solace-prod.isElidingEligible', value='false'

Access headers as Dynamic Document Properties

To Acccess the "foo" userProperties :


To access the "destination" property :

Logging result

Hope this helps,
Thomas.

Tagged:

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 959 admin

    Awesome, thanks for sharing @Thomas Manson 🙏

  • noeljoaquin
    noeljoaquin Member Posts: 2

    Hi Thomas, thanks for sharing this! I noticed this changes yesterday on the connector operation and began to play with it.
    I've tried putting in some properties in there, setting them as Dynamic Document Property/Dynamic Process Property/Static value, etc.
    Unfortunately, none of them work (even the static key/value pairs), I wasn't able to access it on the Boomi Listener process. How do you access those values on the Boomi Listener process?

  • Thomas Manson
    Thomas Manson Member Posts: 19 ✭✭✭

    Hi Noel,

    I'll share you how to do that this week !

    I've used Spring Cloud Stream code to get the events header and check that it works when you're publishing it with Boomi.

    On the listen part, there's a limitation in Boomi's SDK, that doesn't make it easy to get the headers.
    But I'll share how to do that later this week.

    Thomas.

  • noeljoaquin
    noeljoaquin Member Posts: 2

    Hi Thomas,

    I have spoken to Andrew Mackenzie last week and he mentioned that those key/value pairs are also added to the Solace branded connector's document property 'User Properties' together with other key/value pairs we set on the same property through the set properties shape.

    With this, I was able to retrieve the properties already on the listener part (getting from the Solace branded connector's document property "User Properties"). I've written a short script to parse and re-assign these key/value pairs back to be able to use them.

    Thanks so much!

  • Thomas Manson
    Thomas Manson Member Posts: 19 ✭✭✭
    edited July 2021 #6

    Hi Noel,

    This kind of script (Groovy 2.4) on a "Data Process Shape"/"Custom Scripting".
    I'll document it more explicitly later

    import java.util.Properties;
    import java.io.InputStream;
    import groovy.json.JsonSlurper;
    
    for( int i = 0; i < dataContext.getDataCount(); i++ ) 
    {
        InputStream is    = dataContext.getStream(i);
        Properties  props = dataContext.getProperties(i);
    
        def jsonSlurper = new JsonSlurper();
        def object = jsonSlurper.parseText(props.getProperty("document.dynamic.userdefined.ddpUserPropertiesJSON"));
    
    //  foo, baz, static_header_key - these are the three values being passed as JSON keys in the
    //  Solace Connector userProperties JSON string.
    
    //  foo, baz, static_header_key- these are the three DDPs created in the receiving
    //  Boomi Process with  values set to the corresponding values from the user propertie JSON.  Note that
    //  these DDPs are created here and do not need to be created ahead of time.
    //  The developer needs to know the JSON keys coming in in this code.
    
        props.setProperty("document.dynamic.userdefined.foo",object.foo);
        props.setProperty("document.dynamic.userdefined.baz",object.baz);
        props.setProperty("document.dynamic.userdefined.static_header_key",object.static_header_key);
    
        dataContext.storeStream(is, props);
    }
    
  • Thomas Manson
    Thomas Manson Member Posts: 19 ✭✭✭

    I've updated the orignal post with a solution to get headers from the message using BOOMI Listen operation.

  • Thomas Manson
    Thomas Manson Member Posts: 19 ✭✭✭

    I've updated the post with an excel file referencing most of the available headers