Having issues accessing SWIFT subscription using Solace/Kafka

Options

Hey team, 

I'm trying to ingest data from swim.ffa.gov using Solace/Kafka and I'm having issues authenticating: "com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Subscription ACL Denied - Topic 'solacetest' [Subcode:27]"


I'm using the following version:

solace_version: 2.3.0
kafka_version: 2.3.0


SWIFT Subscription from FFA:

providerUrl=
queue=
connectionFactory=
username=
password=
vpn=


Kafka Server configuration:

name=solaceConnectorTFMS

#connector.class=com.solace.source.connector.SolaceSourceConnector
connector.class=com.solace.connector.kafka.connect.source.SolaceSourceConnector

tasks.max=2
kafka.topic=tfms
sol.host=tcps://ems2.swim.faa.gov:55443
sol.username=
sol.password=
sol.vpn_name=TFMS
sol.topics=solacetest
sol.queue=
sol.message_callback_on_reactor=false
sol.message_processor_class=com.solace.connector.kafka.connect.source.msgprocessors.SolSampleSimpleMessageProcessor

sol.generate_send_timestamps=false
sol.generate_rcv_timestamps=false
sol.sub_ack_window_size=255
sol.generate_sequence_numbers=true
sol.calculate_message_expiration=true
sol.subscriber_dto_override=false
sol.channel_properties.connect_retries=-1
sol.channel_properties.reconnect_retries=-1
sol.kafka_message_key=DESTINATION
sol.ssl_validate_certificate=false
#sol.ssl_validate_certicate_date=false
#sol.ssl_connection_downgrade_to=PLAIN_TEXT
sol.ssl_trust_store=/opt/PKI/skeltonCA/heinz1.ts
sol.ssl_trust_store_pasword=sasquatch
sol.ssl_trust_store_format=JKS
#sol.ssl_trusted_command_name_list
sol.ssl_key_store=/opt/PKI/skeltonCA/heinz1.ks
sol.ssl_key_store_password=sasquatch
sol.ssl_key_store_format=JKS
sol.ssl_key_store_normalized_format=JKS
sol.ssl_private_key_alias=heinz1
sol.ssl_private_key_password=sasquatch
#sol.authentication_scheme=AUTHENTICATION_SCHEME_CLIENT_CERTIFICATE
key.converter.schemas.enable=true
value.converter.schemas.enable=true
#key.converter=org.apache.kafka.connect.converters.ByteArrayConverter
value.converter=org.apache.kafka.connect.converters.ByteArrayConverter
#key.converter=org.apache.kafka.connect.json.JsonConverter
#value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
#value.converter=org.apache.kafka.connect.storage.StringConverter


Here is the error I'm getting when trying to connect:

[2022-10-05 17:16:42,666] INFO
 (com.solace.connector.kafka.connect.source.SolSessionHandler:194)
[2022-10-05 17:16:42,657] ERROR WorkerSourceTask{id=solaceConnectorTFMS-1} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:179)
org.apache.kafka.connect.errors.ConnectException: Failed to start topic consumer
        at com.solace.connector.kafka.connect.source.SolaceSourceTask.start(SolaceSourceTask.java:90)
        at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:199)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:750)
Caused by: ((Client name: azd-KafkaServer/10369/001f0002/VWG8IjYLiA   Local addr: 1xx.xx.xx.xx Local port: 34956   Remote addr: ems2.swim.faa.gov  Remote port: 55443) - )  com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Subscription ACL Denied - Topic 'solacetest' [Subcode:27]
        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.createErrorResponseFromSmpFailure(TcpClientChannel.java:1049)
        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.checkSmpResponseOK(TcpClientChannel.java:1015)
        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel.doSmpRequest(TcpClientChannel.java:987)
        at com.solacesystems.jcsmp.impl.SessionModeSupportClient.addSubscription(SessionModeSupportClient.java:133)
        at com.solacesystems.jcsmp.impl.SessionModeSupportClient.addSubscription(SessionModeSupportClient.java:96)
        at com.solacesystems.jcsmp.impl.JCSMPBasicSession.addSubscription(JCSMPBasicSession.java:888)
        at com.solace.connector.kafka.connect.source.SolaceSourceTopicListener.init(SolaceSourceTopicListener.java:66)
        at com.solace.connector.kafka.connect.source.SolaceSourceTask.start(SolaceSourceTask.java:88)
        ... 8 more
[2022-10-05 17:16:42,668] ERROR WorkerSourceTask{id=solaceConnectorTFMS-1} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:180)
[2022-10-05 17:16:42,668] INFO ================ Shutting down PubSub+ Source Connector (com.solace


Anybody can point me the right direction?

Best Answer

  • RobO
    RobO Member, Employee Posts: 17 Solace Employee
    edited October 2022 #2 Answer ✓
    Options

    @razvo You don't have access to the "solacetest" topic on the FAA side. In fact, SCDS does not allow access to any topics. In the details of your subscription from SWIM/SWIFT, you are given a queue name. Use that value for the sol.queue entry in the configuration file you have above.

    Also, comment out #sol.topics as you won't be using this.

    Please post back with your results.

Answers

  • RobO
    RobO Member, Employee Posts: 17 Solace Employee
    edited October 2022 #3 Answer ✓
    Options

    @razvo You don't have access to the "solacetest" topic on the FAA side. In fact, SCDS does not allow access to any topics. In the details of your subscription from SWIM/SWIFT, you are given a queue name. Use that value for the sol.queue entry in the configuration file you have above.

    Also, comment out #sol.topics as you won't be using this.

    Please post back with your results.

  • razvo
    razvo Member Posts: 2
    Options

    It worked. I appreciate your feedback.