🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
i am trying to access my queue but getting error
i am getting this when tying to connect with queue
Messaging Service connected? True
<bound method Queues.SimpleDurableQueue.get_name of <solace.messaging.resources.queue.Queues.SimpleDurableQueue object at 0x7fc87abe7340>>
2022-12-08 09:41:41,744 [WARNING] solace.messaging.receiver: [_solace_utilities.py:363] Invalid datatype. Expected type: [<class 'solace.messaging.resources.share_name.ShareName'>], but actual [<class 'solace.messaging.resources.queue.Queues.SimpleDurableQueue'>]
Traceback (most recent call last):
File "/home/soumya_behera/spc_solace/spc_solace.py", line 70, in <module>
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
File "/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/builder/_impl/_direct_message_receiver_builder.py", line 84, in build
is_type_matches(shared_subscription_group, ShareName, logger=logger)
File "/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/utils/_solace_utilities.py", line 364, in is_type_matches
raise InvalidDataTypeError(exception_message)
solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.resources.share_name.ShareName'>], but actual [<class 'solace.messaging.resources.queue.Queues.SimpleDurableQueue'>]
code
queue_name = "solace-sandbox-new-ada-queue" durable_exclusive_queue =Queue.durable_exclusive_queue(queue_name) print(durable_exclusive_queue.get_name) # Build a receiver and bind it to the durable exclusive queue DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \ .build(durable_exclusive_queue) DirectMessageReceiver.start()
it fails on DirectMessageReceiver
Best Answers
-
now i am getting this error
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage class MessageHandlerImpl(MessageHandler): def on_message(self, message: InboundMessage): source_data = message.get_payload_as_bytes() data_array.append(source_data.decode("utf-8")) print(f'Messaging Service connected? {messaging_service.is_connected}') # Event Handling for the messaging service service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) # messaging_service.add_reconnection_attempt_listener(service_handler) # messaging_service.add_service_interruption_listener(service_handler) # Queue name. # NOTE: This assumes that a persistent queue already exists on the broker with the right topic subscription queue_name = "solace-sandbox-hm-ldw-queue" durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name) print(durable_exclusive_queue.get_name) data_array = [] # Build a receiver and bind it to the durable exclusive queue # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \ # .build(durable_exclusive_queue) # DirectMessageReceiver.start() PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ .build(durable_exclusive_queue) PersistentMessageReceiver.start() # Callback for received messages PersistentMessageReceiver.receive_async(MessageHandlerImpl)
raise InvalidDataTypeError(exception_message)
solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.receiver.message_receiver.MessageHandler'>], but actual [<class 'abc.ABCMeta'>]
0 -
Hey @nbapu934 - the
ack()
method is executed on the persistent receiver and takes and anInboundMessage
as a parameter. You can check out the docs here for more detailsso with your implementation since you dont have auto_acknowledgement configured on the persistent receiver, you will have to pass an instance of the receiver to the message handler and deal with it after processing the message successfully
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver)) class MessageHandlerImpl(MessageHandler): def __init__(self, persistent_receiver: PersistentMessageReceiver): self.persistent_receiver = persistent_receiver def on_message(self, message: InboundMessage): # Check if the payload is a String or Byte, decode if its the later payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes() if isinstance(payload, bytearray): payload = payload.decode() print("\n" + f"Message payload: {payload} \n") self.persistent_receiver.ack(message)
and that way when you
self.persistent_receiver.ack(message)
the message will be dropped from the queue. Hopefully this helps0
Answers
-
now i am getting this error
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage class MessageHandlerImpl(MessageHandler): def on_message(self, message: InboundMessage): source_data = message.get_payload_as_bytes() data_array.append(source_data.decode("utf-8")) print(f'Messaging Service connected? {messaging_service.is_connected}') # Event Handling for the messaging service service_handler = ServiceEventHandler() messaging_service.add_reconnection_listener(service_handler) # messaging_service.add_reconnection_attempt_listener(service_handler) # messaging_service.add_service_interruption_listener(service_handler) # Queue name. # NOTE: This assumes that a persistent queue already exists on the broker with the right topic subscription queue_name = "solace-sandbox-hm-ldw-queue" durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name) print(durable_exclusive_queue.get_name) data_array = [] # Build a receiver and bind it to the durable exclusive queue # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \ # .build(durable_exclusive_queue) # DirectMessageReceiver.start() PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ .build(durable_exclusive_queue) PersistentMessageReceiver.start() # Callback for received messages PersistentMessageReceiver.receive_async(MessageHandlerImpl)
raise InvalidDataTypeError(exception_message)
solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class 'solace.messaging.receiver.message_receiver.MessageHandler'>], but actual [<class 'abc.ABCMeta'>]
0 -
Hi @nbapu934,
Do you need to maybe import more objects at the top?
You can see our sample for consuming from a queue imports many more items used throughout, such as `from solace.messaging.resources.queue import Queue`
Hope that helps!
0 -
Hi @marc can i consume data with only queue name without topic name
queue_name = "xxxxSupplychainxxxx" durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name) data_array = [] # Build a receiver and bind it to the durable exclusive queue # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \ # .build(durable_exclusive_queue) # DirectMessageReceiver.start() persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ .build(durable_exclusive_queue) persistent_receiver.start() persistent_receiver.add_subscription(TopicSubscription.of("topic1")) # Callback for received messages persistent_receiver.receive_async(MessageHandlerImpl()) persistent_receiver.terminate() messaging_service.disconnect() print (f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]') print(data_array)
0 -
Hi @marc I want to acknowledge the message after I print it but when I am doing .ack()
it throws me the error '_InboundMessage' object has no attribute 'ack', Can I have any suggestion on it?
class MessageHandlerImpl(MessageHandler): def on_message(self, message: InboundMessage): # Check if the payload is a String or Byte, decode if its the later payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes() topic = message.get_destination_name() print("\n" + f"Received message on: {topic}") print("\n" + f"Message payload: {payload} \n") # print("\n" + f"Message dump: {message} \n") message.ack() queue_name = "xxxxSupplychainxxxx" durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name) data_array = [] # Build a receiver and bind it to the durable exclusive queue # DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \ # .build(durable_exclusive_queue) # DirectMessageReceiver.start() persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ .build(durable_exclusive_queue) persistent_receiver.start() persistent_receiver.add_subscription(TopicSubscription.of("topic1")) # Callback for received messages persistent_receiver.receive_async(MessageHandlerImpl()) persistent_receiver.terminate() messaging_service.disconnect() print (f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]') print(data_array)
0 -
Hey @nbapu934 - the
ack()
method is executed on the persistent receiver and takes and anInboundMessage
as a parameter. You can check out the docs here for more detailsso with your implementation since you dont have auto_acknowledgement configured on the persistent receiver, you will have to pass an instance of the receiver to the message handler and deal with it after processing the message successfully
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver)) class MessageHandlerImpl(MessageHandler): def __init__(self, persistent_receiver: PersistentMessageReceiver): self.persistent_receiver = persistent_receiver def on_message(self, message: InboundMessage): # Check if the payload is a String or Byte, decode if its the later payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes() if isinstance(payload, bytearray): payload = payload.decode() print("\n" + f"Message payload: {payload} \n") self.persistent_receiver.ack(message)
and that way when you
self.persistent_receiver.ack(message)
the message will be dropped from the queue. Hopefully this helps0 -
Hi @Tamimi thanks for the information it helps a lot one more thing to ask
try: # Build a receiver and bind it to the durable exclusive queue persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()\ .build(durable_exclusive_queue) persistent_receiver.start() # Callback for received messages persistent_receiver.receive_async(MessageHandlerImpl()) print(f'PERSISTENT receiver started... Bound to Queue [{durable_exclusive_queue.get_name()}]') time.sleep(20) except PubSubPlusClientError as exception: print(f'\nMake sure queue {queue_name} exists on broker!')
I don't want to use the time.sleep(20) any idea how I can resolve the issue? without it I can't extract the data from the queue
Adv. Thanks
0 -
Hmm not sure why you had to add the sleep in that location? As you can see in the samples here, there is no sleep
You will have to keep your subscriber in loop to consume and extract messages.
Let me know if you want any more input on this
0