How to configure properties in Solace-Kafka-Sink-Connector

서준하
서준하 Member Posts: 4

Hi,

I connected a message broker to Confluent Kafka using Solace-Kafka-Sink-Connector.
https://github.com/SolaceLabs/pubsubplus-connector-kafka-sink
And I am testing how the matching between Kafka topic and Solace topic works.

Testing is done by changing the topics, sol.topics, and sol.queue properties in the solace.properties file and checking the sending and receiving of messages.

When setting up Solace topics and queues, if I set more than one topic in sol.topics, messages are sent only to the first topic, and if I set more than one queue in sol.queue, the connection is lost and no messages are sent.

For example, if I set up like this in solace.properties files, only 'testtopic01' receives messages of 'test.solace01' and 'test.solace02'.

topics=test.solace01, test.solace02
sol.topics=testtopic01, testtopic02

Is there any way to match multiple topics or queues?

Best Answer

  • [Deleted User]
    [Deleted User] Posts: 0
    #2 Answer ✓

    This is a great question, unfortunately, the answer is "it depends". ;) Since there are multiple alternatives, this will be a bit of a long response.

    There are a couple of ways of matching the Solace and Kafka Topics. Generally, we have discovered that clients use different formats or schemas for the records in the different Kafka Topics. Therefore, it is not just a matter of mapping the the Topics between the two systems but potentially also the record processing requirements.

    To deal with the record processing issue the Solace Sink connector allows multiple record processors (mapping of a Kafka Record to a Solace message) to be created as multiple simple classes. These plugin classes are all available to the Solace Sink Connector and the configuration file defines the record processing class to load and the appropriate record processor is automatically instantiated at run-time. The intention is to allow multiple instance of the same sink connector to be deployed into the Connector cluster by simply using only different configuration files.

    Therefore, if you want to map two Kafka Topics to two different Solace Topics and/or Queues, you would simply deploy the Sink Connector twice, each with the appropriate topic mapping and reference to the appropriate record processor plugin (and of course a different name in the configuration file).

    Architecturally, if the multiple Kafka topics are using the same record format, then there really may be no need to map the Solace Topics and the records from the two Kafka Topics. Potentially they would be published on the same Solace Topic. The concept of Topics is very different between Solace and Kafka. With Solace, the Topic is hierarchical metadata describing the payload. The metadata is used by the broker to provide routing to applications that have registered subscription interest. The routing is based on a hierarchical structure and wildcards for client registration of interest in receiving specific data. With Kafka a Topic is a physical reference for the implementation of the storage of records that is distributed among multiple partitions. Stream processors are required to further segment the topics, where data is scanned and refined from one topic and then placed in another to provide a routing-like functionality.

    As a result, if multiple Kafka topics are all using the same record format, they would potentially all be published on the same Solace Topic. The Sink connector supports sending the same record from Kafka to multiple Topics and Queue to deal with scenarios where there are legacy systems that are using more than one Solace topic or queue to map the same payload to different topics. It is also used to overcome issues when the Solace design did not follow best practices for topic hierarchies.

    However, we can also solve the issue another way with the Solace Sink Connector. In some cases the Kafka Records may be the same format, but that might map to different Solace topics as part of the Solace Topic Hierarchy. Rather than requiring Kafka to perform the Stream processing and create filtered subsets of the data and then have a different Sink Connectors deployed for for each Topic with a refined subset of the data, the Sink connector eliminates all the additional Kafka network traffic, processing and creation of multiple stream processes by using a Solace Sink Connector feature call "Dynamic Destinations" (please refer to the README for more details).

    Dynamic Destinations are special record processors that are available for the Solace Sink Conector. By setting the configuration parameter to indicate "dynamic destinations", the Solace Topic and Queue configuration parameters are ignored. This setting also now allows the record processor to dynamically set the Solace Message Topic AND the record to message mapping in the same Record Processor. Since the Topic metadata describes the payload, the heirachy is generally now defined in logic in the record processor and separate record processing can be defined for each outbound Solace message within a single Solace Sink Connector. All this is now possible with a single dynamic destination record processor.

    All the Kafka Record details are available to for the dynamic destination record processor (Topic, Key, headers, Schema, etc). So if all you are looking at is mapping Kafka Topics to Solace Topics, a simple dynamic destination record processor can be used. The record processor simply needs to look at the topic the record arrive from and send it to the appropriate Solace topic.

    The Solace Sink Connector source code provides examples of both regular and dynamic destination record processors. Again, the same Sink connector can be used and the behaviour of using a regular or dynamic record processor is defined in the configuration at runtime. The dynamic destination allows a single record processor in the Solace Sink Connector to deal with multiple Solace destinations and multiple record processors with only a single Connector to be deployed. However, multiple dynamic record processors can be available when the sink connector is deployed

    If you are at the Kafka Summit in San Francisco next week, please stop by. We will be demonstrating routing of messages from a single Kafka Topic via the Solace Sink Connector and a dynamic record processor. Solace natively provides the connectivity between RabbitMQ, AWS SQS (via REST) and multiple MQTT IoT devices. Based solely on the the details in the different Kafka Records in a single Kafka Topic and a single simple record processor in the Solace Sink Connector the Kafka Record is dynamically sent to the appropriate downstream system. All this via a single Solace Connector and the dynamic destination record processing plugin and no stream processors or additional connectors.

Answers

  • [Deleted User]
    [Deleted User] Posts: 0
    #3 Answer ✓

    This is a great question, unfortunately, the answer is "it depends". ;) Since there are multiple alternatives, this will be a bit of a long response.

    There are a couple of ways of matching the Solace and Kafka Topics. Generally, we have discovered that clients use different formats or schemas for the records in the different Kafka Topics. Therefore, it is not just a matter of mapping the the Topics between the two systems but potentially also the record processing requirements.

    To deal with the record processing issue the Solace Sink connector allows multiple record processors (mapping of a Kafka Record to a Solace message) to be created as multiple simple classes. These plugin classes are all available to the Solace Sink Connector and the configuration file defines the record processing class to load and the appropriate record processor is automatically instantiated at run-time. The intention is to allow multiple instance of the same sink connector to be deployed into the Connector cluster by simply using only different configuration files.

    Therefore, if you want to map two Kafka Topics to two different Solace Topics and/or Queues, you would simply deploy the Sink Connector twice, each with the appropriate topic mapping and reference to the appropriate record processor plugin (and of course a different name in the configuration file).

    Architecturally, if the multiple Kafka topics are using the same record format, then there really may be no need to map the Solace Topics and the records from the two Kafka Topics. Potentially they would be published on the same Solace Topic. The concept of Topics is very different between Solace and Kafka. With Solace, the Topic is hierarchical metadata describing the payload. The metadata is used by the broker to provide routing to applications that have registered subscription interest. The routing is based on a hierarchical structure and wildcards for client registration of interest in receiving specific data. With Kafka a Topic is a physical reference for the implementation of the storage of records that is distributed among multiple partitions. Stream processors are required to further segment the topics, where data is scanned and refined from one topic and then placed in another to provide a routing-like functionality.

    As a result, if multiple Kafka topics are all using the same record format, they would potentially all be published on the same Solace Topic. The Sink connector supports sending the same record from Kafka to multiple Topics and Queue to deal with scenarios where there are legacy systems that are using more than one Solace topic or queue to map the same payload to different topics. It is also used to overcome issues when the Solace design did not follow best practices for topic hierarchies.

    However, we can also solve the issue another way with the Solace Sink Connector. In some cases the Kafka Records may be the same format, but that might map to different Solace topics as part of the Solace Topic Hierarchy. Rather than requiring Kafka to perform the Stream processing and create filtered subsets of the data and then have a different Sink Connectors deployed for for each Topic with a refined subset of the data, the Sink connector eliminates all the additional Kafka network traffic, processing and creation of multiple stream processes by using a Solace Sink Connector feature call "Dynamic Destinations" (please refer to the README for more details).

    Dynamic Destinations are special record processors that are available for the Solace Sink Conector. By setting the configuration parameter to indicate "dynamic destinations", the Solace Topic and Queue configuration parameters are ignored. This setting also now allows the record processor to dynamically set the Solace Message Topic AND the record to message mapping in the same Record Processor. Since the Topic metadata describes the payload, the heirachy is generally now defined in logic in the record processor and separate record processing can be defined for each outbound Solace message within a single Solace Sink Connector. All this is now possible with a single dynamic destination record processor.

    All the Kafka Record details are available to for the dynamic destination record processor (Topic, Key, headers, Schema, etc). So if all you are looking at is mapping Kafka Topics to Solace Topics, a simple dynamic destination record processor can be used. The record processor simply needs to look at the topic the record arrive from and send it to the appropriate Solace topic.

    The Solace Sink Connector source code provides examples of both regular and dynamic destination record processors. Again, the same Sink connector can be used and the behaviour of using a regular or dynamic record processor is defined in the configuration at runtime. The dynamic destination allows a single record processor in the Solace Sink Connector to deal with multiple Solace destinations and multiple record processors with only a single Connector to be deployed. However, multiple dynamic record processors can be available when the sink connector is deployed

    If you are at the Kafka Summit in San Francisco next week, please stop by. We will be demonstrating routing of messages from a single Kafka Topic via the Solace Sink Connector and a dynamic record processor. Solace natively provides the connectivity between RabbitMQ, AWS SQS (via REST) and multiple MQTT IoT devices. Based solely on the the details in the different Kafka Records in a single Kafka Topic and a single simple record processor in the Solace Sink Connector the Kafka Record is dynamically sent to the appropriate downstream system. All this via a single Solace Connector and the dynamic destination record processing plugin and no stream processors or additional connectors.

  • 서준하
    서준하 Member Posts: 4

    Thank you for the detailed answer. Sorry for the late response because it took a long time to read and apply the answer. And I've got another question.

    I read the description of the task (https://docs.confluent.io/current/connect/concepts.html),
    but I don't understand how to apply it to the properties file.
    How does the program's behavior change when the tasks.max value in the solace.properties file changes?