🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
How do I publish a bytearray on the Solace Direct Publisher?
Hello everyone! Trying to publish a bytearray instead of an outbound message object on the direct publisher but it seems nothing is being published / subscriber is mishandling the received message. Can't seem to figure out which.
Here is my publisher script:
def publish_one(self, msg, _topic): try: topic = Topic.of(TOPIC_PREFIX + _topic) # # msg_builder # outbound_msg = self.outbound_msg_builder\ # .with_application_message_id(f'{_topic}') \ # .build(f'{msg}') # msg is a byte string, converted to a byte array outbound_msg = bytearray(msg) self.direct_publisher.publish(destination=topic, message=outbound_msg) except KeyboardInterrupt: print('\nTerminating Publisher') self.direct_publisher.terminate() print('\nDisconnecting Messaging Service') self.msg_service.disconnect()
On a specific topic, I want to retrieve the bytearrays published and decode them using my decoder. I intend to convert the bytearray back to a bytestring then decode it on a separete script. Here is my subscriber message handler script, temporarily printing it out on the terminal.
class MessageHandlerImpl(MessageHandler): def on_message(self, message): b = ''.join(chr(x) for x in message) print(f'\n Message dump: {message_dump}')
Would appreciate the help! Thank you!
Answers
-
Hey @sheanne,
I was able to run a simple script locally and it worked with passing a byte array. A couple of questions:
- What is the encoding that you are passing to bytearray in
outbound_msg = bytearray(msg)
? - In your
MessageHandlerImpl
, what do you get if you dump the message directlyprint(f'\n Message dump: {message}')
(instead of using themessage_dump
variable)?
You can also get the payload as a byte in your
MessageHandlerImpl
as followsclass MessageHandlerImpl(MessageHandler): def on_message(self, message: InboundMessage): payload_byte = message.get_payload_as_bytes() # print("\n" + f"Message Payload String: {message.get_payload_as_string()} \n") # print("\n" + f"Message Topic: {message.get_destination_name()} \n") print("\n" + f"Message dump: {message} \n") print("\n" + f"Message byte: {payload_byte} \n")
Running the following I am able to dump the message and the byte payload
# Build a Receiver with the given topics and start it direct_receiver = messaging_service.create_direct_message_receiver_builder()\ .with_subscriptions(TopicSubscription.of("test/>"))\ .build() direct_receiver.start() direct_receiver.receive_async(MessageHandlerImpl()) # Create a direct message publisher and start it direct_publisher = messaging_service.create_direct_message_publisher_builder().build() # Blocking Start thread direct_publisher.start() topic = Topic.of("test/topic") outbound_msg = bytearray("Hello world", encoding='utf8') try: try: while True: direct_publisher.publish(destination=topic, message=outbound_msg) time.sleep(1) except KeyboardInterrupt: print('\nDisconnecting Messaging Service') finally: print('\nTerminating receiver') direct_receiver.terminate() print('\nTerminating Publisher') direct_publisher.terminate() print('\nDisconnecting Messaging Service') messaging_service.disconnect()
0 - What is the encoding that you are passing to bytearray in
-
Hey @Tamimi,
The message is a json dictionary encoded using ASCII through simple binary encoding.
# encoded message bytestring, encoded using ASCII through simple binary encoding msg = 'x00Deposit\x00\x00\x00ftx\x00\x00\x00\x00\x00\x00\x00USD\x00obHN&`C@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002022-01-06T04:26:06.0491926172157\x00' # converted to bytearray outbound_msg = bytearray(msg)
when print the message dump directly, i get this error:
'utf-8' codec can't decode byte 0xbf in position 484: invalid start byte
I believe the error comes from when I convert to a bytearray. Can I directly pass in the msg (bytestring) to the publisher?
Instead of bytearray, I used list() to make it a bytearray:
outbound_msg = list(msg)
Then I used:
message_dump = message.get_payload_as_bytes()
but I cant seem to receive the same bytestring as what was passed in, cause it cant be processed by my decoder.
0