i am getting this when tying to connect with queue
Messaging Service connected? True
<bound method Queues.SimpleDurableQueue.get_name of <solace.messaging.resources.queue.Queues.SimpleDurableQueue object at 0x7fc87abe7340>>
2022-12-08 09:41:41,744 [WARNING] solace.messaging.receiver: [_solace_utilities.py:363] Invalid datatype. Expected type: [<class ‘solace.messaging.resources.share_name.ShareName’>], but actual [<class ‘solace.messaging.resources.queue.Queues.SimpleDurableQueue’>]
Traceback (most recent call last):
File “/home/soumya_behera/spc_solace/spc_solace.py”, line 70, in
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder()
File “/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/builder/_impl/_direct_message_receiver_builder.py”, line 84, in build
is_type_matches(shared_subscription_group, ShareName, logger=logger)
File “/home/soumya_behera/.local/lib/python3.9/site-packages/solace/messaging/utils/_solace_utilities.py”, line 364, in is_type_matches
raise InvalidDataTypeError(exception_message)
solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class ‘solace.messaging.resources.share_name.ShareName’>], but actual [<class ‘solace.messaging.resources.queue.Queues.SimpleDurableQueue’>]
code
queue_name = “solace-sandbox-new-ada-queue”
durable_exclusive_queue =Queue.durable_exclusive_queue(queue_name)
print(durable_exclusive_queue.get_name)Build a receiver and bind it to the durable exclusive queue
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder()
.build(durable_exclusive_queue)
DirectMessageReceiver.start()
it fails on DirectMessageReceiver
now i am getting this error
from solace.messaging.receiver.message_receiver import MessageHandler, InboundMessage
class MessageHandlerImpl(MessageHandler):
def on_message(self, message: InboundMessage):
source_data = message.get_payload_as_bytes()
data_array.append(source_data.decode(“utf-8”))
print(f’Messaging Service connected? {messaging_service.is_connected}')Event Handling for the messaging service
service_handler = ServiceEventHandler()
messaging_service.add_reconnection_listener(service_handler)messaging_service.add_reconnection_attempt_listener(service_handler)
messaging_service.add_service_interruption_listener(service_handler)
Queue name.
NOTE: This assumes that a persistent queue already exists on the broker with the right topic subscription
queue_name = “solace-sandbox-hm-ldw-queue”
durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
print(durable_exclusive_queue.get_name)
data_array =Build a receiver and bind it to the durable exclusive queue
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
# .build(durable_exclusive_queue)
DirectMessageReceiver.start()
PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.build(durable_exclusive_queue)
PersistentMessageReceiver.start()Callback for received messages
PersistentMessageReceiver.receive_async(MessageHandlerImpl)
raise InvalidDataTypeError(exception_message)
solace.messaging.errors.pubsubplus_client_error.InvalidDataTypeError: Invalid datatype. Expected type: [<class ‘solace.messaging.receiver.message_receiver.MessageHandler’>], but actual [<class ‘abc.ABCMeta’>]
Hi @nbapu934 ,
Do you need to maybe import more objects at the top?
You can see our sample for consuming from a queue imports many more items used throughout, such as from solace.messaging.resources.queue import Queue
solace-samples-python/guaranteed_subscriber.py at main · SolaceSamples/solace-samples-python
Solace PubSub+ Messaging API for Python. Join the Solace community for further discussions - solace-samples-python/guaranteed_subscriber.py at main · SolaceSamples/solace-samples-python
Hope that helps!
Hi @marc.dipasquale > can i consume data with only queue name without topic name
queue_name = “xxxxSupplychainxxxx”
durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
data_array =Build a receiver and bind it to the durable exclusive queue
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
.build(durable_exclusive_queue)
DirectMessageReceiver.start()
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.build(durable_exclusive_queue)
persistent_receiver.start()
persistent_receiver.add_subscription(TopicSubscription.of("topic 1 > "))Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl())
persistent_receiver.terminate()
messaging_service.disconnect()
print (f’PERSISTENT receiver started… Bound to Queue [{durable_exclusive_queue.get_name()}]')
print(data_array)
Hi @nbapu934 ,
Yes you should be able to! Just don’t add a topic subscription.
Hope that helps,
Marc
Hi @marc.dipasquale > I want to acknowledge the message after I print it but when I am doing .ack()
it throws me the error ‘_InboundMessage’ object has no attribute ‘ack’, Can I have any suggestion on it?
class MessageHandlerImpl(MessageHandler):
def on_message(self, message: InboundMessage):
# Check if the payload is a String or Byte, decode if its the later
payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
topic = message.get_destination_name()
print(“\n” + f"Received message on: {topic}“)
print(”\n" + f"Message payload: {payload} \n")
# print(“\n” + f"Message dump: {message} \n")
message.ack()
queue_name = “xxxxSupplychainxxxx”
durable_exclusive_queue = Queue.durable_exclusive_queue(queue_name)
data_array =Build a receiver and bind it to the durable exclusive queue
DirectMessageReceiver = messaging_service.create_direct_message_receiver_builder() \
.build(durable_exclusive_queue)
DirectMessageReceiver.start()
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.build(durable_exclusive_queue)
persistent_receiver.start()
persistent_receiver.add_subscription(TopicSubscription.of(“topic1”))Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl())
persistent_receiver.terminate()
messaging_service.disconnect()
print (f’PERSISTENT receiver started… Bound to Queue [{durable_exclusive_queue.get_name()}]')
print(data_array)
Hey @nbapu934 - the ack() method is executed on the persistent receiver and takes and an InboundMessage as a parameter. You can check out the docs here for more details
so with your implementation since you dont have auto_acknowledgement configured on the persistent receiver, you will have to pass an instance of the receiver to the message handler and deal with it after processing the message successfully
persistent_receiver.receive_async(MessageHandlerImpl(persistent_receiver))
class MessageHandlerImpl(MessageHandler):
def init(self, persistent_receiver: PersistentMessageReceiver):
self.persistent_receiver = persistent_receiver
def on_message(self, message: InboundMessage):
# Check if the payload is a String or Byte, decode if its the later
payload = message.get_payload_as_string() if message.get_payload_as_string() != None else message.get_payload_as_bytes()
if isinstance(payload, bytearray):
payload = payload.decode()
print(“\n” + f"Message payload: {payload} \n")
self.persistent_receiver.ack(message)
and that way when you self.persistent_receiver.ack(message) the message will be dropped from the queue. Hopefully this helps
Hi @Tamimi > thanks for the information it helps a lot one more thing to ask
try:
# Build a receiver and bind it to the durable exclusive queue
persistent_receiver: PersistentMessageReceiver = messaging_service.create_persistent_message_receiver_builder()
.build(durable_exclusive_queue)
persistent_receiver.start()
# Callback for received messages
persistent_receiver.receive_async(MessageHandlerImpl())
print(f’PERSISTENT receiver started… Bound to Queue [{durable_exclusive_queue.get_name()}]‘)
time.sleep(20)
except PubSubPlusClientError as exception:
print(f’\nMake sure queue {queue_name} exists on broker!')
I don’t want to use the time.sleep(20) any idea how I can resolve the issue? without it I can’t extract the data from the queue
Adv. Thanks
Hmm not sure why you had to add the sleep in that location? As you can see in the samples here, there is no sleep
solace-samples-python/guaranteed_subscriber.py at main · SolaceSamples/solace-samples-python
Solace PubSub+ Messaging API for Python. Join the Solace community for further discussions - solace-samples-python/guaranteed_subscriber.py at main · SolaceSamples/solace-samples-python You will have to keep your subscriber in loop to consume and extract messages.
Let me know if you want any more input on this
@Tamimi In your git link, line number 91 code is using time.sleep(1) without it code can’t extract data from the queue, correct me if I am wrong. if I am wrong please share the solution because I have tried without it, and it can’t extract data from the queue.
The sleep in the code sample is in the while loop, it’s there just for demo purposes. What you’re missing in your code sample is a while loop to keep the connection on to the broker and avoid your application exiting