Python Proton AMQPS Transport Error
Good afternoon all,
Am having some odd behavior using a varient of Solace demo code for Python/Proton AMQPS connection: https://github.com/SolaceSamples/solace-samples-amqp-qpid-proton-python/blob/master/src/simple_recv.py
Using the methodology shown in this case, but with a pem key/password i can successfully connect to a queue for polling. However every 50s the connection disconnects, i get an unknown error, and then it resumes.
The code is as follows:
import proton from proton import SSLDomain from proton._reactor import Container from proton.handlers import MessagingHandler from datetime import datetime url = "amqps://xxxxxxxxxxxxxx:xxxx" client_certificate_path = "./xxxxxxxxxxxx.pem" client_certificate_password = "xxxxxxxxxxxxxxx" queue = "xxxxx_xxxxx" connectionname = "xxx/xxx/xxx" messagesnumber = 0 autoacknowledgment = False class PythonAmqpQueueConsumer(MessagingHandler): def __init__(self): super(PythonAmqpQueueConsumer, self).__init__(auto_accept=autoacknowledgment) self.url = url self.queue = queue self.received = 0 self.expected = messagesnumber self.name = connectionname self.certificate_path = client_certificate_path self.certificate_password = client_certificate_password def build_ssl_domain(self): ssl_certificate = self.certificate_path ssl_certificate_password = self.certificate_password ssl_disable_peer_name_verify = False domain = SSLDomain(SSLDomain.MODE_CLIENT) if ssl_certificate: domain.set_credentials(str(ssl_certificate), str(ssl_certificate), str(ssl_certificate_password)) return domain def on_start(self, event): #if self.expected > 0: print(datetime.now(), "Creating subscriber") domain = self.build_ssl_domain() conn = event.container.connect(url=self.url, allow_insecure_mechs=False, ssl_domain=domain) if conn: # attaches receiver link to receive messages event.container.create_receiver(context=conn, source=self.queue, name=self.name) print(datetime.now(), "Receiver successfully created") def on_message(self, event): print(datetime.now(), "New message!") if event.message.id and event.message.id < self.received: if autoacknowledgment is False: event.delivery.update(event.delivery.ACCEPTED) # ignore duplicate message return if self.expected == 0 or self.received < self.expected: print(datetime.now(), "Message received from {}: {}".format(event.message.address, event.message.body)) print(datetime.now(), "{} messages left.\n".format(self.expected - self.received - 1)) self.received += 1 if autoacknowledgment is False: event.delivery.update(event.delivery.ACCEPTED) if self.received == self.expected: event.receiver.close() event.connection.close() print("{} messages received. Closing connection.".format(self.received)) def on_disconnected(self, event): print(event) print(datetime.now(), "Disconnected") # receives socket or authentication failures def on_transport_error(self, event): print(datetime.now(), "Transport failure for amqp broker:", self.url, "Error:", event.transport.condition) MessagingHandler.on_transport_error(self, event) try: print(datetime.now(), '%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE') Container(PythonAmqpQueueConsumer()).run() except KeyboardInterrupt: pass
The errors, which are:
PN_TRANSPORT_TAIL_CLOSED followed by PN_TRANSPORT_CLOSED
2022-01-31 14:26:30.782634 SSL present
2022-01-31 14:26:30.794141 Creating subscriber
2022-01-31 14:26:30.801938 Receiver successfully created
PN_TRANSPORT_TAIL_CLOSED(<proton._transport.Transport 0x7fee3bcd71c0 ~ 0x21255a0>)
2022-01-31 14:27:20.964299 Disconnected
PN_TRANSPORT_CLOSED(<proton._transport.Transport 0x7fee3bcd7df0 ~ 0x21255a0>)
2022-01-31 14:27:20.964429 Disconnected
2022-01-31 14:27:20.964671 Transport failure for amqp broker: amqps://xxxxxxxx:xxxx Error: Condition('amqp:connection:framing-error', 'SSL Failure: Unknown error')
PN_TRANSPORT_TAIL_CLOSED(<proton._transport.Transport 0x7fee3bcd7df0 ~ 0x213a530>)
2022-01-31 14:28:11.120901 Disconnected
PN_TRANSPORT_CLOSED(<proton._transport.Transport 0x7fee3bcd7dc0 ~ 0x213a530>)
2022-01-31 14:28:11.121156 Disconnected
2022-01-31 14:28:11.121474 Transport failure for amqp broker: amqps://xxxxxxxx:xxxx Error: Condition('amqp:connection:framing-error', 'SSL Failure: Unknown error')
As it occurs every 50s, i wondered if it were a timeout option, but looking at the api i am currently unable to find any options or settings to set this.
Any help greatly appreciated.
Thank you
Answers
-
Hi @stushep,
Sorry this hasn't been answered yet - the answer doesn't seem to be obvious.
Can you check a few things:
- Is the client actually connected? Once you start the app you should see it show up as a client in PubSub+ Manager
- If it is connected and you publish a message tot he topic it's subscribing on does it ever get received?
- Can you see what is the acl and event logs on the broker itself? That should give us some hints.
0 -
Hi @marc , Thank you for your response.
The connection is unfortunately to a corporate Solace account where i am unable to login and see the server status myself.
The logic i have figured out so far is that:
Connection to the queue = successful, The first batch is downloaded and then i believe the error occurs. After 50s this error times out, disconnects, reconnects and the consumer downloads the next batch, then errors, looping this.
Running on my own solace cloud test without SSL, the python code works perfectly and it consumes all messages and stays connected, so confirms its an SSL error as such.
Have raised the concern also with the corporate account team who will hopefully be in touch with Solace support that way, however hands are pretty tied.
Are you aware if there are specific SSL protocols or TLS which must be used potentially or some other SSL blockers ?
Thank you for any support :)
0 -
Hi @stushep,
My gut says it has something to do with a proxy/load balancer/firewall in between your machine and the Event Broker. Usually SSL/TLS issues happen immediately and you would not be authorized to connect and consume messages. Probably something to do with SSL offloading or being load balanced to a different broker or something preventing your return traffic from reaching the broker.
Unfortunately I'm just guessing at random stuff and the route you're taking to go back through the Corporate team + support will be most fruitful.
Hope you have a good weekend!
0 -
Hello,
Could you try this : handler="default" added to the create_receiver
def on_start(self, event): #if self.expected > 0: print(datetime.now(), "Creating subscriber") domain = self.build_ssl_domain() conn = event.container.connect(url=self.url, allow_insecure_mechs=False, ssl_domain=domain) if conn: # attaches receiver link to receive messages event.container.create_receiver(context=conn, source=self.queue, name=self.name, handler="default") print(datetime.now(), "Receiver successfully created")
we never know … k.regards
0