How do I publish a bytearray on the Solace Direct Publisher?

sheanne
sheanne Member Posts: 13

Hello everyone! Trying to publish a bytearray instead of an outbound message object on the direct publisher but it seems nothing is being published / subscriber is mishandling the received message. Can't seem to figure out which.

Here is my publisher script:

def publish_one(self, msg, _topic):
      try:
        topic = Topic.of(TOPIC_PREFIX + _topic)
        
        # # msg_builder
        # outbound_msg = self.outbound_msg_builder\
        #  .with_application_message_id(f'{_topic}') \
        #  .build(f'{msg}')
        
        # msg is a byte string, converted to a byte array
        outbound_msg = bytearray(msg)
        self.direct_publisher.publish(destination=topic, message=outbound_msg)

      except KeyboardInterrupt:
        print('\nTerminating Publisher')
        self.direct_publisher.terminate()
        print('\nDisconnecting Messaging Service')
        self.msg_service.disconnect()

On a specific topic, I want to retrieve the bytearrays published and decode them using my decoder. I intend to convert the bytearray back to a bytestring then decode it on a separete script. Here is my subscriber message handler script, temporarily printing it out on the terminal.

class MessageHandlerImpl(MessageHandler):
  def on_message(self, message):
    b = ''.join(chr(x) for x in message)
    print(f'\n Message dump: {message_dump}')

Would appreciate the help! Thank you!

Answers

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 538 admin
    edited February 2022 #2

    Hey @sheanne,

    I was able to run a simple script locally and it worked with passing a byte array. A couple of questions:

    1. What is the encoding that you are passing to bytearray in outbound_msg = bytearray(msg) ?
    2. In your MessageHandlerImpl, what do you get if you dump the message directly print(f'\n Message dump: {message}') (instead of using the message_dump variable)?

    You can also get the payload as a byte in your MessageHandlerImpl as follows

    class MessageHandlerImpl(MessageHandler):
        def on_message(self, message: InboundMessage):
            payload_byte = message.get_payload_as_bytes()
            # print("\n" + f"Message Payload String: {message.get_payload_as_string()} \n")
            # print("\n" + f"Message Topic: {message.get_destination_name()} \n")
            print("\n" + f"Message dump: {message} \n")
            print("\n" + f"Message byte: {payload_byte} \n")
    

    Running the following I am able to dump the message and the byte payload

    # Build a Receiver with the given topics and start it
    direct_receiver = messaging_service.create_direct_message_receiver_builder()\
        .with_subscriptions(TopicSubscription.of("test/>"))\
        .build()
    
    direct_receiver.start()
    direct_receiver.receive_async(MessageHandlerImpl())
    
    # Create a direct message publisher and start it
    direct_publisher = messaging_service.create_direct_message_publisher_builder().build()
    
    # Blocking Start thread
    direct_publisher.start()
    
    topic = Topic.of("test/topic")
    outbound_msg = bytearray("Hello world", encoding='utf8')
    
    try:
        try:
            while True:
                direct_publisher.publish(destination=topic, message=outbound_msg)
                time.sleep(1)
        except KeyboardInterrupt:
            print('\nDisconnecting Messaging Service')
    finally:
        print('\nTerminating receiver')
        direct_receiver.terminate()
        print('\nTerminating Publisher')
        direct_publisher.terminate()
        print('\nDisconnecting Messaging Service')
        messaging_service.disconnect()
    
    
    


  • sheanne
    sheanne Member Posts: 13
    edited February 2022 #3

    Hey @Tamimi,

    The message is a json dictionary encoded using ASCII through simple binary encoding.

    # encoded message bytestring, encoded using ASCII through simple binary encoding
    msg = 'x00Deposit\x00\x00\x00ftx\x00\x00\x00\x00\x00\x00\x00USD\x00obHN&`C@\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x002022-01-06T04:26:06.0491926172157\x00'
    # converted to bytearray
    outbound_msg = bytearray(msg)
    

    when print the message dump directly, i get this error:

    'utf-8' codec can't decode byte 0xbf in position 484: invalid start byte
    

    I believe the error comes from when I convert to a bytearray. Can I directly pass in the msg (bytestring) to the publisher?


    Instead of bytearray, I used list() to make it a bytearray:

    outbound_msg = list(msg) 
    

    Then I used:

    message_dump = message.get_payload_as_bytes()

    but I cant seem to receive the same bytestring as what was passed in, cause it cant be processed by my decoder.

  • sheanne
    sheanne Member Posts: 13

    Hey @Tamimi !

    I've figured it out now! Thank you very much!! <3

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 538 admin

    Fantastic! Thanks for the update @sheanne

    Happy coding and feel free to share your findings, projects, and questions here as you continue your exploration and development :)