BOOMI : Publish Events to PubSub+ with custom headers & Listen to events and get headers

I write this post to share how to do the following things in Boomi :

  • Publish a message to Solace with custom headers
  • Listen for message and get the headers, parsing the custom headers stored in “userProperties” header

Boomi is an iPaaS solution (other well known iPaaS : TIBCO Cloud Integration, Mulesoft)

You can read about Solace Headers here :

Post a message with customer header in BOOMI

First be sure to select the following connector to connect to Solace :

“Solace PubSub+ Platform - Partner connector”

This connector is developed by Solace.

Pass static key=value pairs

Since the version 220 published on the 10th of July you can pass static key value pairs as headers from the “Operation” :

The result is the following :
your message is published with the standard headers + static_header_key=static_header_value header.

In a future release, you’ll also be able to pass dynamic data as well.

Pass dynamic key value pairs

As of now, to pass dynamic data, you need to

  • Use the “Set Property” Shape
  • Type : Document property
  • Source Type : connector
  • connectors : select your “Solace PubSub+ Platform - Partner connector” instance
  • Property : User Property

As a value, you need to pass a JSON object with key/value pair;
This JSON string object should be built in your Boomi process prior to using the “Set Property” shape, and use a dynamic value.

(on the screenshot, I’m using a static value)

Exemple of value :

{“foo”:“bar”,“baz”:“fubar”}

The result is the following :
your message is published with the standard headers + the foo=bar & baz=fubar headers.

Listen to Events & Get Headers in BOOMI

Connection

be sure to use
“Solace PubSub+ Platform - Partner connector”

Add a groovy 2.4 custom script ‘Data Process Shape’

Add the following code :

import java.util.Properties;
import java.io.InputStream;
import groovy.json.JsonSlurper;
import com.boomi.execution.ExecutionUtil;

logger = ExecutionUtil.getBaseLogger();

for( int i = 0; i < dataContext.getDataCount(); i++ ) 
{
    InputStream is                   = dataContext.getStream(i);
    Properties  props                = dataContext.getProperties(i);
    Set<String> keys                 = props.stringPropertyNames();
    String      solaceUserProperties = null;
    
    for(key in keys)
    {
       if(key.endsWith(".userProperties"))
       {//Parsing JSON object representing userProperties and set it as individual DDP
            solaceUserProperties = props.getProperty(key);
            logger.info("Solace UserProperties found key='"+key+"', value='"+solaceUserProperties+"'");
           
            def jsonSlurper = new JsonSlurper();
            def object      = jsonSlurper.parseText(solaceUserProperties);
            
            Set<String> objectKeys = object.keySet();
            
            for(key2 in objectKeys)
            {
                logger.info("Solace Properties found key='"+key+"', value='"+object.get(key2)+"'");
                props.setProperty("document.dynamic.userdefined."+key2,object.get(key2));
            }
       }
       else
       {
           logger.fine("Solace Other Properties found key='"+key+"', value='"+props.getProperty(key)+"'");
           //ex: 'connector.track.solacetpp-VMLLNA-solace-prod.isElidingEligible', we just want the last part
           String subKey = key.substring(key.lastIndexOf(".")+1);
           props.setProperty("document.dynamic.userdefined.solace_"+subKey,props.getProperty(key));
       }
    }
    logger.info(props.toString());
    dataContext.storeStream(is, props);
}

This code will do the following :

  • Add all headers as Dynamic Document Property under the name “solace__HEADER_KEY_”
  • Parse userProperties json and add as Dynamic Document Property under the name “key”

Here are the list of standard headers:

key=‘connector.track.solacetpp-VMLLNA-solace-prod.senderTimestamp’, value=‘null’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.messageIDLong’, value=‘4’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.userProperties’, value=‘{“static_header_key”:“static_header_value_xyz”,“foo”:“bar”,“baz”:“fubar”}
key=‘connector.track.solacetpp-VMLLNA-solace-prod.dataType’, value=‘track’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.sourceType’, value=‘connector’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.redelivered’, value=‘false’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.cos’, value=‘0’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.type’, value=‘solacetpp-VMLLNA-solace-prod’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.priority’, value=‘4’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.timeToLive’, value=‘0’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.isReplyMessage’, value=‘false’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.isDMQEligible’, value=‘true’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.deliveryMode’, value=‘DIRECT’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.discardIndication’, value=‘false’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.messageID’, value=‘4’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.receiveTimestamp’, value=‘0’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.destination’, value=‘ride/request’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.expiration’, value=‘0’
key=‘connector.track.solacetpp-VMLLNA-solace-prod.isElidingEligible’, value=‘false’

Access headers as Dynamic Document Properties

To Acccess the “foo” userProperties :

To access the “destination” property :

Logging result

Hope this helps,
Thomas.

SMF Header Reference.xlsx (17.8 KB)

Awesome, thanks for sharing @Thomas_Manson ?

Hi Thomas, thanks for sharing this! I noticed this changes yesterday on the connector operation and began to play with it.
I’ve tried putting in some properties in there, setting them as Dynamic Document Property/Dynamic Process Property/Static value, etc.
Unfortunately, none of them work (even the static key/value pairs), I wasn’t able to access it on the Boomi Listener process. How do you access those values on the Boomi Listener process?

Hi Noel,

I’ll share you how to do that this week !

I’ve used Spring Cloud Stream code to get the events header and check that it works when you’re publishing it with Boomi.

On the listen part, there’s a limitation in Boomi’s SDK, that doesn’t make it easy to get the headers.
But I’ll share how to do that later this week.

Thomas.

Hi Thomas,

I have spoken to Andrew Mackenzie last week and he mentioned that those key/value pairs are also added to the Solace branded connector’s document property ‘User Properties’ together with other key/value pairs we set on the same property through the set properties shape.

With this, I was able to retrieve the properties already on the listener part (getting from the Solace branded connector’s document property “User Properties”). I’ve written a short script to parse and re-assign these key/value pairs back to be able to use them.

Thanks so much!

Hi Noel,

This kind of script (Groovy 2.4) on a “Data Process Shape”/“Custom Scripting”.
I’ll document it more explicitly later

import java.util.Properties;
import java.io.InputStream;
import groovy.json.JsonSlurper;

for( int i = 0; i < dataContext.getDataCount(); i++ ) 
{
    InputStream is    = dataContext.getStream(i);
    Properties  props = dataContext.getProperties(i);

    def jsonSlurper = new JsonSlurper();
    def object = jsonSlurper.parseText(props.getProperty("document.dynamic.userdefined.ddpUserPropertiesJSON"));

//  foo, baz, static_header_key - these are the three values being passed as JSON keys in the
//  Solace Connector userProperties JSON string.

//  foo, baz, static_header_key- these are the three DDPs created in the receiving
//  Boomi Process with  values set to the corresponding values from the user propertie JSON.  Note that
//  these DDPs are created here and do not need to be created ahead of time.
//  The developer needs to know the JSON keys coming in in this code.

    props.setProperty("document.dynamic.userdefined.foo",object.foo);
    props.setProperty("document.dynamic.userdefined.baz",object.baz);
    props.setProperty("document.dynamic.userdefined.static_header_key",object.static_header_key);

    dataContext.storeStream(is, props);
}

I’ve updated the orignal post with a solution to get headers from the message using BOOMI Listen operation.

I’ve updated the post with an excel file referencing most of the available headers