Python Proton AMQPS Transport Error

stushep
stushep Member Posts: 20

Good afternoon all,

Am having some odd behavior using a varient of Solace demo code for Python/Proton AMQPS connection: https://github.com/SolaceSamples/solace-samples-amqp-qpid-proton-python/blob/master/src/simple_recv.py

Using the methodology shown in this case, but with a pem key/password i can successfully connect to a queue for polling. However every 50s the connection disconnects, i get an unknown error, and then it resumes.

The code is as follows:

import proton
from proton import SSLDomain
from proton._reactor import Container
from proton.handlers import MessagingHandler
from datetime import datetime

url = "amqps://xxxxxxxxxxxxxx:xxxx"
client_certificate_path = "./xxxxxxxxxxxx.pem"
client_certificate_password = "xxxxxxxxxxxxxxx"
queue = "xxxxx_xxxxx"
connectionname = "xxx/xxx/xxx"
messagesnumber = 0
autoacknowledgment = False

class PythonAmqpQueueConsumer(MessagingHandler):
  def __init__(self):
    super(PythonAmqpQueueConsumer, self).__init__(auto_accept=autoacknowledgment)
    self.url = url
    self.queue = queue
    self.received = 0
    self.expected = messagesnumber
    self.name = connectionname
     
    self.certificate_path = client_certificate_path
    self.certificate_password = client_certificate_password

  def build_ssl_domain(self):
    ssl_certificate = self.certificate_path
    ssl_certificate_password = self.certificate_password
    ssl_disable_peer_name_verify = False

    domain = SSLDomain(SSLDomain.MODE_CLIENT)

    if ssl_certificate:
      domain.set_credentials(str(ssl_certificate), str(ssl_certificate), str(ssl_certificate_password))
    return domain

  def on_start(self, event):
    #if self.expected > 0:
    print(datetime.now(), "Creating subscriber")
    domain = self.build_ssl_domain()
    conn = event.container.connect(url=self.url, allow_insecure_mechs=False, ssl_domain=domain)
    if conn:
      # attaches receiver link to receive messages
      event.container.create_receiver(context=conn, source=self.queue, name=self.name)
      print(datetime.now(), "Receiver successfully created")

  def on_message(self, event):
    print(datetime.now(), "New message!")
    if event.message.id and event.message.id < self.received:
      if autoacknowledgment is False:
        event.delivery.update(event.delivery.ACCEPTED)
      # ignore duplicate message
      return
    if self.expected == 0 or self.received < self.expected:
      print(datetime.now(), "Message received from {}: {}".format(event.message.address, event.message.body))
      print(datetime.now(), "{} messages left.\n".format(self.expected - self.received - 1))
      self.received += 1
      if autoacknowledgment is False:
        event.delivery.update(event.delivery.ACCEPTED)
      if self.received == self.expected:
        event.receiver.close()
        event.connection.close()
        print("{} messages received. Closing connection.".format(self.received))

  def on_disconnected(self, event):
    print(event)
    print(datetime.now(), "Disconnected")

  # receives socket or authentication failures
  def on_transport_error(self, event):
    print(datetime.now(), "Transport failure for amqp broker:", self.url, "Error:", event.transport.condition)
    MessagingHandler.on_transport_error(self, event)

try:
  print(datetime.now(), '%s' % 'SSL present' if proton.SSL.present() else 'SSL NOT AVAILABLE')
  Container(PythonAmqpQueueConsumer()).run()
except KeyboardInterrupt:
  pass

The errors, which are:

PN_TRANSPORT_TAIL_CLOSED followed by PN_TRANSPORT_CLOSED


2022-01-31 14:26:30.782634 SSL present

2022-01-31 14:26:30.794141 Creating subscriber

2022-01-31 14:26:30.801938 Receiver successfully created

PN_TRANSPORT_TAIL_CLOSED(<proton._transport.Transport 0x7fee3bcd71c0 ~ 0x21255a0>)

2022-01-31 14:27:20.964299 Disconnected

PN_TRANSPORT_CLOSED(<proton._transport.Transport 0x7fee3bcd7df0 ~ 0x21255a0>)

2022-01-31 14:27:20.964429 Disconnected

2022-01-31 14:27:20.964671 Transport failure for amqp broker: amqps://xxxxxxxx:xxxx Error: Condition('amqp:connection:framing-error', 'SSL Failure: Unknown error')

PN_TRANSPORT_TAIL_CLOSED(<proton._transport.Transport 0x7fee3bcd7df0 ~ 0x213a530>)

2022-01-31 14:28:11.120901 Disconnected

PN_TRANSPORT_CLOSED(<proton._transport.Transport 0x7fee3bcd7dc0 ~ 0x213a530>)

2022-01-31 14:28:11.121156 Disconnected

2022-01-31 14:28:11.121474 Transport failure for amqp broker: amqps://xxxxxxxx:xxxx Error: Condition('amqp:connection:framing-error', 'SSL Failure: Unknown error')

As it occurs every 50s, i wondered if it were a timeout option, but looking at the api i am currently unable to find any options or settings to set this.

Any help greatly appreciated.

Thank you

Tagged:

Answers

  • stushep
    stushep Member Posts: 20

    *** bump *** anyone know?

  • stushep
    stushep Member Posts: 20

    Still an open question :(

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 973 admin

    Hi @stushep,

    Sorry this hasn't been answered yet - the answer doesn't seem to be obvious.

    Can you check a few things:

    1. Is the client actually connected? Once you start the app you should see it show up as a client in PubSub+ Manager
    2. If it is connected and you publish a message tot he topic it's subscribing on does it ever get received?
    3. Can you see what is the acl and event logs on the broker itself? That should give us some hints.


  • stushep
    stushep Member Posts: 20

    Hi @marc , Thank you for your response.

    The connection is unfortunately to a corporate Solace account where i am unable to login and see the server status myself.

    The logic i have figured out so far is that:

    Connection to the queue = successful, The first batch is downloaded and then i believe the error occurs. After 50s this error times out, disconnects, reconnects and the consumer downloads the next batch, then errors, looping this.

    Running on my own solace cloud test without SSL, the python code works perfectly and it consumes all messages and stays connected, so confirms its an SSL error as such.

    Have raised the concern also with the corporate account team who will hopefully be in touch with Solace support that way, however hands are pretty tied.

    Are you aware if there are specific SSL protocols or TLS which must be used potentially or some other SSL blockers ?

    Thank you for any support :)

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 973 admin

    Hi @stushep,

    My gut says it has something to do with a proxy/load balancer/firewall in between your machine and the Event Broker. Usually SSL/TLS issues happen immediately and you would not be authorized to connect and consume messages. Probably something to do with SSL offloading or being load balanced to a different broker or something preventing your return traffic from reaching the broker.

    Unfortunately I'm just guessing at random stuff and the route you're taking to go back through the Corporate team + support will be most fruitful.

    Hope you have a good weekend!

  • coz787
    coz787 Member Posts: 6

    Hello,

    Could you try this : handler="default" added to the create_receiver

    def on_start(self, event):
        #if self.expected > 0:
        print(datetime.now(), "Creating subscriber")
        domain = self.build_ssl_domain()
        conn = event.container.connect(url=self.url, allow_insecure_mechs=False, ssl_domain=domain)
        if conn:
          # attaches receiver link to receive messages
          event.container.create_receiver(context=conn, source=self.queue, name=self.name, handler="default")
          print(datetime.now(), "Receiver successfully created")
    

    we never know … k.regards