i am trying to access my queue but getting error

nbapu934
nbapu934 Member Posts: 6
edited December 2022 in Connectors & Integrations #1

i am getting this when tying to connect with queue

Messaging Service connected? True

<bound method Queues.SimpleDurableQueue.get_name of <solace.messaging.resources.queue.Queues.SimpleDurableQueue object at 0x7fc87abe7340>>

2022-12-08 09:41:41,744 [WARNING] solace.messaging.receiver: [_solace_utilities.py:363] Invalid datatype. Expected type: [<class 'solace.messaging.resources.share_name.ShareName'>], but actual [<class 'solace.messaging.resources.queue.Queues.SimpleDurableQueue'>]

Traceback (most recent call last):

 File "/home/soumya_behera/spc_solace/spc_solace.py", line 70, in <module>

  DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \

 File "/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/builder/_impl/_direct_message_receiver_builder.py", line 84, in build

  is_type_matches(shared_subscription_group, ShareName, logger=logger)

 File "/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/utils/_solace_utilities.py", line 364, in is_type_matches

  raise InvalidDataTypeError(exception_message)

solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.resources.share_name.ShareName'>], but actual [<class 'solace.messaging.resources.queue.Queues.SimpleDurableQueue'>]

code

queue_name = "solace-sandbox-new-ada-queue"
durable_exclusive_queue =Queue.durable_exclusive_queue(queue_name)
print(durable_exclusive_queue.get_name)
# Build a receiver and bind it to the durable exclusive queue
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
    .build(durable_exclusive_queue)
DirectMessageReceiver.start()

it fails on DirectMessageReceiver

Best Answers

  • nbapu934
    nbapu934 Member Posts: 6
    edited December 2022 #2 Answer ✓

    now i am getting this error

    from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
    class MessageHandlerImpl(MessageHandler):
        def on_message(self, message: InboundMessage):
            source_data = message.get_payload_as_bytes()
            data_array.append(source_data.decode("utf-8"))
    print(f'Messaging Service connected? {messaging_service.is_connected}')
    # Event Handling for the messaging service
    service_handler = ServiceEventHandler()
    messaging_service.add_reconnection_listener(service_handler)
    # messaging_service.add_reconnection_attempt_listener(service_handler)
    # messaging_service.add_service_interruption_listener(service_handler)
    # Queue name.
    # NOTE: This assumes that a persistent queue already exists on the broker with the right topic subscription
    queue_name = "solace-sandbox-hm-ldw-queue"
    durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
    print(durable_exclusive_queue.get_name)
    data_array = []
    # Build a receiver and bind it to the durable exclusive queue
    # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
    #     .build(durable_exclusive_queue)
    # DirectMessageReceiver.start()
    PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
        .build(durable_exclusive_queue)
    PersistentMessageReceiver.start()
    # Callback for received messages
    PersistentMessageReceiver.receive_async(MessageHandlerImpl)
    

      raise InvalidDataTypeError(exception_message)

    solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.receiver.message_receiver.MessageHandler'>], but actual [<class 'abc.ABCMeta'>]

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    #3 Answer ✓

    Hi @nbapu934,

    Yes you should be able to! Just don't add a topic subscription.

    Hope that helps,

    Marc

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 491 admin
    #4 Answer ✓

    Hey @nbapu934 - the ack() method is executed on the persistent receiver and takes and an InboundMessage as a parameter. You can check out the docs here for more details

    so with your implementation since you dont have auto_acknowledgement configured on the persistent receiver, you will have to pass an instance of the receiver to the message handler and deal with it after processing the message successfully


    persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
    
    class MessageHandlerImpl(MessageHandler):
        def __init__(self, persistent_receiver: PersistentMessageReceiver):
            self.persistent_receiver = persistent_receiver
    
        def on_message(self, message: InboundMessage):
            # Check if the payload is a String or Byte, decode if its the later
            payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
            if isinstance(payload, bytearray):
                payload = payload.decode()
    
            print("\n" + f"Message payload: {payload} \n")
            self.persistent_receiver.ack(message)
    

    and that way when you self.persistent_receiver.ack(message) the message will be dropped from the queue. Hopefully this helps

Answers

  • nbapu934
    nbapu934 Member Posts: 6
    edited December 2022 #5 Answer ✓

    now i am getting this error

    from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
    class MessageHandlerImpl(MessageHandler):
        def on_message(self, message: InboundMessage):
            source_data = message.get_payload_as_bytes()
            data_array.append(source_data.decode("utf-8"))
    print(f'Messaging Service connected? {messaging_service.is_connected}')
    # Event Handling for the messaging service
    service_handler = ServiceEventHandler()
    messaging_service.add_reconnection_listener(service_handler)
    # messaging_service.add_reconnection_attempt_listener(service_handler)
    # messaging_service.add_service_interruption_listener(service_handler)
    # Queue name.
    # NOTE: This assumes that a persistent queue already exists on the broker with the right topic subscription
    queue_name = "solace-sandbox-hm-ldw-queue"
    durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
    print(durable_exclusive_queue.get_name)
    data_array = []
    # Build a receiver and bind it to the durable exclusive queue
    # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
    #     .build(durable_exclusive_queue)
    # DirectMessageReceiver.start()
    PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
        .build(durable_exclusive_queue)
    PersistentMessageReceiver.start()
    # Callback for received messages
    PersistentMessageReceiver.receive_async(MessageHandlerImpl)
    

      raise InvalidDataTypeError(exception_message)

    solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.receiver.message_receiver.MessageHandler'>], but actual [<class 'abc.ABCMeta'>]

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @nbapu934,

    Do you need to maybe import more objects at the top?

    You can see our sample for consuming from a queue imports many more items used throughout, such as `from solace.messaging.resources.queue import Queue`


    Hope that helps!

  • nbapu934
    nbapu934 Member Posts: 6

    Hi @marc can i consume data with only queue name without topic name

    queue_name = "xxxxSupplychainxxxx"
    durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
    data_array = []
    # Build a receiver and bind it to the durable exclusive queue
    # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
    #     .build(durable_exclusive_queue)
    # DirectMessageReceiver.start()
    persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
        .build(durable_exclusive_queue)
    persistent_receiver.start()
    persistent_receiver.add_subscription(TopicSubscription.of("topic1"))
    # Callback for received messages
    persistent_receiver.receive_async(MessageHandlerImpl())
    persistent_receiver.terminate()
    messaging_service.disconnect()
    print (f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
    print(data_array)
    
  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    #8 Answer ✓

    Hi @nbapu934,

    Yes you should be able to! Just don't add a topic subscription.

    Hope that helps,

    Marc

  • nbapu934
    nbapu934 Member Posts: 6

    Hi @marc I want to acknowledge the message after I print it but when I am doing .ack()

    it throws me the error '_InboundMessage' object has no attribute 'ack', Can I have any suggestion on it?

    class MessageHandlerImpl(MessageHandler):
        def on_message(self, message: InboundMessage):
            # Check if the payload is a String or Byte, decode if its the later
            payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
            topic = message.get_destination_name()
            print("\n" + f"Received message on: {topic}")
            print("\n" + f"Message payload: {payload} \n")
            # print("\n" + f"Message dump: {message} \n")
            message.ack()
    queue_name = "xxxxSupplychainxxxx"
    durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
    data_array = []
    # Build a receiver and bind it to the durable exclusive queue
    # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
    #     .build(durable_exclusive_queue)
    # DirectMessageReceiver.start()
    persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
        .build(durable_exclusive_queue)
    persistent_receiver.start()
    persistent_receiver.add_subscription(TopicSubscription.of("topic1"))
    # Callback for received messages
    persistent_receiver.receive_async(MessageHandlerImpl())
    persistent_receiver.terminate()
    messaging_service.disconnect()
    print (f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
    print(data_array)
    


  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 491 admin
    #10 Answer ✓

    Hey @nbapu934 - the ack() method is executed on the persistent receiver and takes and an InboundMessage as a parameter. You can check out the docs here for more details

    so with your implementation since you dont have auto_acknowledgement configured on the persistent receiver, you will have to pass an instance of the receiver to the message handler and deal with it after processing the message successfully


    persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
    
    class MessageHandlerImpl(MessageHandler):
        def __init__(self, persistent_receiver: PersistentMessageReceiver):
            self.persistent_receiver = persistent_receiver
    
        def on_message(self, message: InboundMessage):
            # Check if the payload is a String or Byte, decode if its the later
            payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
            if isinstance(payload, bytearray):
                payload = payload.decode()
    
            print("\n" + f"Message payload: {payload} \n")
            self.persistent_receiver.ack(message)
    

    and that way when you self.persistent_receiver.ack(message) the message will be dropped from the queue. Hopefully this helps

  • nbapu934
    nbapu934 Member Posts: 6

    Hi @Tamimi thanks for the information it helps a lot one more thing to ask

    try:
      # Build a receiver and bind it to the durable exclusive queue
      persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\
                .build(durable_exclusive_queue)
      persistent_receiver.start()
    
    
      # Callback for received messages
      persistent_receiver.receive_async(MessageHandlerImpl())
      
      print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]')
      time.sleep(20)
    except PubSubPlusClientError as exception:
      print(f'\nMake sure queue {queue_name} exists on broker!')
    

    I don't want to use the time.sleep(20) any idea how I can resolve the issue? without it I can't extract the data from the queue

    Adv. Thanks

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 491 admin

    Hmm not sure why you had to add the sleep in that location? As you can see in the samples here, there is no sleep

    You will have to keep your subscriber in loop to consume and extract messages.


    Let me know if you want any more input on this

  • nbapu934
    nbapu934 Member Posts: 6

    @Tamimi In your git link, line number 91 code is using time.sleep(1) without it code can't extract data from the queue, correct me if I am wrong. if I am wrong please share the solution because I have tried without it, and it can't extract data from the queue.

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 491 admin

    The sleep in the code sample is in the while loop, it's there just for demo purposes. What you're missing in your code sample is a while loop to keep the connection on to the broker and avoid your application exiting