Kafka Solace Sink Connector - Received record that had no data....discarded
Hello,
I'm configuring a Solace Sink connector for Kafka with the following properties -
"connector.class": "com.solace.sink.connector.SolaceSinkConnector", "tasks.max": "2", "topics": "nt_pop_fxrt_002_001", "sol.host": "xx.xx.xxx.xxx", "sol.username": "syskaf", "sol.password": "", "sol.vpn_name": "sysvpn16", "sol.queue": "POP.SRA.PUBSUB.IN", "sol.record_processor_class": "com.solace.sink.connector.recordprocessor.SolSimpleRecordProcessor", "key.converter": "org.apache.kafka.connect.storage.StringConverter", "value.converter": "org.apache.kafka.connect.storage.StringConverter",
The connector initializes without any issues. However, whenever I publish any data to the Kafka topic I get the following error message -
[2019-10-29 14:53:36,661] INFO ==============Received record that had no data....discarded (com.solace.sink.connector.SolaceSinkSender)
Does the Sink connector not support the string converter for Kafka Connect or is there some other configuration I'm missing?
Thanks,
Anthony
Answers
-
The connector record processors that are included in the project are meant to act as simple samples. It is expected that users would modify the record processors to match their specific requirements. You can add as many as you want to the project and the configuration file would for the connector would specify the desired record processor and it would be resolved and loaded at run-time.
The simple record processor is the the sample processor you are using with the sink connector. It was written to send the Kafka record as a binary payload (i.e. there would a direct publishing of the Kafka records as a Solace binary payload). With Solace, the binary payload is expected to be byte array. Therefore, you should use the following for the configuration:
"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
If the Kafka record is a string, then the Solace consumer would still be able to decode the message as a string. However, the sample record processor assumes a byte array for the connector record.
So you can change the configuration to something similar to above, and your string records from Kafka would be processed as string payloads in Solace by converting the payload back to a string.
Alternatively, you can modify the simple processor to check if the record is a string and then send the record as a string in Solace. Because it currently does not check for string records, there is no payload formed to send to Solace, so you get the recorded error. If you add the check if the record is a string to the record processor your converter configuration would work properly.
Ultimately, we encourage people to create their own record processors and add them to their project rather than using the simple samples included. Regardless, changing the converter to byte would still send strings in Solace if the original Kafka record is a String.
0