Integrating the Solace Kafka Sink Connector with Confluent Schema Registry

JamiesonWalker Member, Employee Posts: 2 Solace Employee
edited February 26 in Connectors & Integrations #1

Avro is a common message format used for Kafka records. Large Kafka implementations leveraging Avro often use the Confluent Schema Registry to ensure uniform record schemas across publishers and subscribers.   

When using the Solace PubSub+ Kafka Connectors to publish Kafka records as Solace PubSub+ events, it may be necessary to integrate with a schema registry in order to deserialize records. For example, to dynamically synthesize Solace topics using data in a record’s payload. This post covers how to integrate the Solace Kafka Connectors with the Confluent Schema Register using the Confluent Avro Converter, in order to automatically (de)serialize Avro messages using schemas from the Schema Registry. 

Required Components:

Solace PubSub+ Sink Connector for Kafka 

Confluent Kafka Connect Avro Converter zip  

Google Guava jar  

Procedures to leverage the Kafka Connect Avro Converter for message deserialization: 

  1. Modify the Solace PubSub+ Connector for Kafka: Sink files: 
    1. Open ~/pubsubplus-connector-kafka-sink-2.x.x/etc/ solace_sink_properties.json if using json flavor). 
    2. Comment out line 10 of the file using the # character – value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
    3. Insert a new line, instructing the connector to use the Avro Converter – value.converter=io.confluent.connect.avro.AvroConverter  
    4. Insert another new line specifying the type of conversion needed – value.converter.schema.registry.url=io.confluent.connect.avro.AvroConverter
    5. Save and close the file. 
  2. Add the Guava dependency to the Solace PubSub+ Connector for Kafka: Sink libraries: 
    1. Manually move the guava-XX.X.X-jre.jar from its download location to the ~/pubsubplus-connector-kafka-sink-2.x.x/lib directory  
    2. NOTE: If you would prefer to skip step 2 and manually add the dependencies to the ~/pubsubplus-connector-kafka-sink-2.x.x/lib/ directory, these are the additional jars that must be moved: 
  3. Prepare additional dependencies – To leverage the Avro Converter there are several new java dependencies. We will obtain the majority of them from the Confluent Kafka Connect Avro Converter libraries.  
    1. Unzip the contents of the folder to your desired directory on the target machine. For example, we will place them directly off the root: /confluentinc-kafka-connect-avro-converter-7.0.1/ 
    2. Add the /confluentinc-kafka-connect-avro-converter-7.0.1/libs directory to your java CLASSPATH so the Solace PubSub+ Connector for Kafka: Sink will know where to find the new dependencies. We also have to provide the location of our original Solace Connector lib directory as we blow away, the default process when explicitly setting CLASSPATH. export CLASSPATH=/kafka/connectors/pubsubplus-connector-kafka-sink-2.1.0/lib/guava-31.0.1-jre.jar:/confluent-hub/confluentinc-kafka-connect-avro-converter-7.0.1/lib/* NOTE: If you have additional jar dependencies add them to the CLASSPATH at this stage.
      • kafka-connect-avro-converter-7.0.1.jar
      • kafka-schema-registry-client-7.0.1.jar
      • kafka-schema-serializer-7.0.1.jar
      • kafka-avro-serializer-7.0.1.jar
      • kafka-connect-avro-data-7.0.1.jar
  4. Deploy the customized Solace PubSub+ Connector for Kafka: Sink (~/pubsubplus-connector-kafka-sink-2.x.x/) to the target system following the Deployment Guide in the Connectors readme

Once started, the Solace PubSub+ Sink Connector for Kafka reads each record from Kafka using the Avro Converter instead of the generic Byte Array Converter. For each message, the Avro Converter reads the schema ID from the record and obtains the correct schema from the Schema Registry to deserialize the Avro message. The contents of the Avro message can now be parsed and acted upon by a custom RecordProcessor class as described in the Connectors readme

Additional Information:

Confluent talks about serialization and deserialization of records