🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
Integrating the Solace Kafka Sink Connector with Confluent Schema Registry
Avro is a common message format used for Kafka records. Large Kafka implementations leveraging Avro often use the Confluent Schema Registry to ensure uniform record schemas across publishers and subscribers.
When using the Solace PubSub+ Kafka Connectors to publish Kafka records as Solace PubSub+ events, it may be necessary to integrate with a schema registry in order to deserialize records. For example, to dynamically synthesize Solace topics using data in a record’s payload. This post covers how to integrate the Solace Kafka Connectors with the Confluent Schema Register using the Confluent Avro Converter, in order to automatically (de)serialize Avro messages using schemas from the Schema Registry.
Required Components:
Solace PubSub+ Sink Connector for Kafka
Confluent Kafka Connect Avro Converter zip
Procedures to leverage the Kafka Connect Avro Converter for message deserialization:
- Modify the Solace PubSub+ Connector for Kafka: Sink files:
- Open
~/pubsubplus-connector-kafka-sink-2.x.x/etc/solace_sink.properties
(or solace_sink_properties.json if using json flavor). - Comment out line 10 of the file using the
#
character –value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
- Insert a new line, instructing the connector to use the Avro Converter –
value.converter=io.confluent.connect.avro.AvroConverter
- Insert another new line specifying the type of conversion needed –
value.converter.schema.registry.url=io.confluent.connect.avro.AvroConverter
- Save and close the solace_sink.properties file.
- Open
- Add the Guava dependency to the Solace PubSub+ Connector for Kafka: Sink libraries:
- Manually move the
guava-XX.X.X-jre.jar
from its download location to the~/pubsubplus-connector-kafka-sink-2.x.x/lib
directory - NOTE: If you would prefer to skip step 2 and manually add the dependencies to the
~/pubsubplus-connector-kafka-sink-2.x.x/lib/
directory, these are the additional jars that must be moved:
- Manually move the
- Prepare additional dependencies – To leverage the Avro Converter there are several new java dependencies. We will obtain the majority of them from the Confluent Kafka Connect Avro Converter libraries.
- Unzip the contents of the
confluentinc-kafka-connect-avro-converter-X.X.X.zip
folder to your desired directory on the target machine. For example, we will place them directly off the root:/confluentinc-kafka-connect-avro-converter-7.0.1/
- Add the
/confluentinc-kafka-connect-avro-converter-7.0.1/libs
directory to your javaCLASSPATH
so the Solace PubSub+ Connector for Kafka: Sink will know where to find the new dependencies. We also have to provide the location of our original Solace Connector lib directory as we blow away, the default process when explicitly setting CLASSPATH.export CLASSPATH=/kafka/connectors/pubsubplus-connector-kafka-sink-2.1.0/lib/guava-31.0.1-jre.jar:/confluent-hub/confluentinc-kafka-connect-avro-converter-7.0.1/lib/*
NOTE: If you have additional jar dependencies add them to the CLASSPATH at this stage.kafka-connect-avro-converter-7.0.1.jar
kafka-schema-registry-client-7.0.1.jar
kafka-schema-serializer-7.0.1.jar
kafka-avro-serializer-7.0.1.jar
kafka-connect-avro-data-7.0.1.jar
- Unzip the contents of the
- Deploy the customized Solace PubSub+ Connector for Kafka: Sink (
~/pubsubplus-connector-kafka-sink-2.x.x/
) to the target system following the Deployment Guide in the Connectors readme.
Once started, the Solace PubSub+ Sink Connector for Kafka reads each record from Kafka using the Avro Converter instead of the generic Byte Array Converter. For each message, the Avro Converter reads the schema ID from the record and obtains the correct schema from the Schema Registry to deserialize the Avro message. The contents of the Avro message can now be parsed and acted upon by a custom RecordProcessor class as described in the Connectors readme.
Additional Information:
Confluent talks about serialization and deserialization of records
Comments
-
Thanks for sharing the procedures, @JamiesonWalker!
0