Hello everyone! Trying to publish a bytearray instead of an outbound message object on the direct publisher but it seems nothing is being published / subscriber is mishandling the received message. Can’t seem to figure out which.
Here is my publisher script:
def publish_one(self, msg, _topic):
try:
topic = Topic.of(TOPIC_PREFIX + _topic)# # msg_builder # outbound_msg = self.outbound_msg_builder\ # .with_application_message_id(f'{_topic}') \ # .build(f'{msg}') # msg is a byte string, converted to a byte array outbound_msg = bytearray(msg) self.direct_publisher.publish(destination=topic, message=outbound_msg) except KeyboardInterrupt: print('\nTerminating Publisher') self.direct_publisher.terminate() print('\nDisconnecting Messaging Service') self.msg_service.disconnect()
On a specific topic, I want to retrieve the bytearrays published and decode them using my decoder. I intend to convert the bytearray back to a bytestring then decode it on a separete script. Here is my subscriber message handler script, temporarily printing it out on the terminal.
class MessageHandlerImpl(MessageHandler):
def on_message(self, message):
b = ‘’.join(chr(x) for x in message)
print(f’\n Message dump: {message_dump}')
Would appreciate the help! Thank you!
Hey @sheanne ,
I was able to run a simple script locally and it worked with passing a byte array. A couple of questions:
What is the encoding that you are passing to bytearray in outbound_msg = bytearray(msg) ? In your MessageHandlerImpl , what do you get if you dump the message directly print (f’\n Message dump : {message}') (instead of using the message_dump variable)? You can also get the payload as a byte in your MessageHandlerImpl > as follows
class MessageHandlerImpl(MessageHandler):
def on_message(self, message: InboundMessage): payload_byte = message.get_payload_as_bytes() # print("\n" + f"Message Payload String: {message.get_payload_as_string()} \n") # print("\n" + f"Message Topic: {message.get_destination_name()} \n") print("\n" + f"Message dump: {message} \n") print("\n" + f"Message byte: {payload_byte} \n")
Running the following I am able to dump the message and the byte payload
Build a Receiver with the given topics and start it
direct_receiver = messaging_service.create_direct_message_receiver_builder()
.with_subscriptions(TopicSubscription.of(“test/>”))
.build()
direct_receiver.start()
direct_receiver.receive_async(MessageHandlerImpl())Create a direct message publisher and start it
direct_publisher = messaging_service.create_direct_message_publisher_builder().build()
Blocking Start thread
direct_publisher.start()
topic = Topic.of(“test/topic”)
outbound_msg = bytearray(“Hello world”, encoding=‘utf8’)
try:
try:
while True:
direct_publisher.publish(destination=topic, message=outbound_msg)
time.sleep(1)
except KeyboardInterrupt:
print(‘\nDisconnecting Messaging Service’)
finally:
print(‘\nTerminating receiver’)
direct_receiver.terminate()
print(‘\nTerminating Publisher’)
direct_publisher.terminate()
print(‘\nDisconnecting Messaging Service’)
messaging_service.disconnect()
Hey @Tamimi > ,
The message is a json dictionary encoded using ASCII through simple binary encoding.
encoded message bytestring, encoded using ASCII through simple binary encoding
msg = ‘x00Deposit\x00\x00\x00ftx\x00\x00\x00\x00\x00\x00\x00USD\x00obHN&`C@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002022-01-06T04:26:06.0491926172157\x00’
converted to bytearray
outbound_msg = bytearray(msg)
when print the message dump directly, i get this error:
‘utf-8’ codec can’t decode byte 0xbf in position 484: invalid start byte
I believe the error comes from when I convert to a bytearray. Can I directly pass in the msg (bytestring) to the publisher?
Instead of bytearray, I used list () to make it > a bytearray:
outbound_msg = list(msg)
Then I used:
message_dump = message.get_payload_as_bytes()
but I cant seem to receive the same bytestring as what was passed in, cause it cant be processed by my decoder.
Hey @Tamimi !
I’ve figured it out now! Thank you very much!! <3
Fantastic! Thanks for the update @sheanne
Happy coding and feel free to share your findings, projects, and questions here as you continue your exploration and development