Python and Publishing to Queue using solace's API

stushep
stushep Member Posts: 20

Good morning all,

Continuing my Solace journey today and after a lot of reading, there is no where that i can find which details how to publish to an actual queue from the python API.

I have seen a single example which is using a different SEMP API, but it doesnt make sense to me to utilise two different connection methodologies so pursuing the Solace API itself.

After yesterdays faff of connecting to the cloud, I can now utilise the demo's from your git repository with 'relative' ease, modifying the SolaceConstants... However none of the demo's show publishing to a queue where it will sit and await the subscriber. - They all just talk to a topic which is non-persistant.

i.e.

send message -> no subscriber -> bin it

I am looking for the settings as shown below:

send message -> hold in queue -> subscribe and get message

Which API call/demo do you have showing the send message part of the API ?

For current settings:

  • I have a queue setup matching your try-me called 'try-me-queue'
  • The class currently being used from your demo to send to a Topic (doesnt seem to work with queue)

    class HowToDirectPublishMessage:
            def direct_message_publish(messaging_service: MessagingService, destination, message):
            """ to publish str or byte array type message"""
    
            try:
                direct_publish_service =         messaging_service.create_direct_message_publisher_builder(). \
                    on_back_pressure_reject(buffer_capacity=0).build()
                pub_start = direct_publish_service.start_async()
                pub_start.result()
                direct_publish_service.publish(destination=destination, message=message)
            finally:
                direct_publish_service.terminate()
    
  • This is called by the main func:

    destination_name = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
    HowToDirectPublishMessage.direct_message_publish(messaging_service, destination_name, constants.MESSAGE_TO_SEND)
    
  • Is there maybe a Queue.of('My Queue Name Here') option which is not obvious?

Thank you.

Best Answer

  • amackenzie
    amackenzie Member, Employee Posts: 268 Solace Employee
    #2 Answer ✓

    You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.

Answers

  • stushep
    stushep Member Posts: 20

    Have re-asked Q under API broker... managed to ask in General which is incorrect area - unable to see a delete post option

  • amackenzie
    amackenzie Member, Employee Posts: 268 Solace Employee
    edited January 2022 #4

    It's important to know the core messaging concepts of PubSub+ in order to know which APIs to apply to your use cases.

    https://docs.solace.com/Basics/Core-Concepts.htm

    While you can publish directly to a queue, it is generally a best practice to publish to a topic as a topic can be subscribed to by 1 or more queues making it a true pub/sub and offering much more flexibility.

    And you can publish to a topic in a persistent way (where you are using the guaranteed messaging facilities) using a persistent publisher.

    https://github.com/SolaceSamples/solace-samples-python/blob/master/howtos/pubsub/how_to_publish_persistent_message.py

    This is a sampler of various permutations of a persistent publisher.

    I hope that helps.

  • stushep
    stushep Member Posts: 20

    Good morning @amackenzie , Thank you for the quick reply again.

    I understand the concepts and how a more advanced setup should be done within Solace... have run through many vid's on youtube about the taxi app etc.

    For my use case however i am wishing to have the simple message sent to a queue which sits awaiting a poll....

    Using the more advanced Try-Me tool within the web app under manage, this is possible... within the left-hand pane I select the query radio button, send a message... then within the right-hand pane i connect and then subscribe to my queue and the messages are retrieved....

    I have utilised the sample code for sending a publisher non_blocking string and unfortunately it is still sending a message to a topic not the queue. If i am connected to the broker i receive the message.. if i am not, it is lost.

    Please see full code:

    from solace.messaging.messaging_service import MessagingService,RetryStrategy
    from solace.messaging.config.transport_security_strategy import TLS
    from solace.messaging.config import _sol_constants
    from solace.messaging.messaging_service import MessagingService
    from solace.messaging.resources.topic import Topic
    from solace.messaging.utils.converter import ObjectToBytes
    from solace.messaging.utils.manageable import Metric
    
    from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
    import time
    from string import Template
    
    class SolaceConstants:
        """this class contains all the constants used through out the project"""
        TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
        # QUEUE_NAME_FORMAT = Template('Q/$iteration')
        # TOPIC_ENDPOINT_1 = "try-me1"
        # TOPIC_ENDPOINT_2 = "try-me2"
        # APPLICATION_MESSAGE_ID = "Solace-App"
        MESSAGE_TO_SEND = "hello Solace...123"
        # ENCODING_TYPE = "utf-8"
        # CUSTOM_PROPS =  {"language": "en-CA", "isEncrypted": "True"}
        # TOPIC_ENDPOINT = "purchase/tickets"
        # GROUP_NAME1 = "analytics"
        # GROUP_NAME2 = "booking"
        # MESSAGE_PRIORITY = 1
        # MESSAGE_EXPIRATION = 5000
        # MESSAGE_SEQUENCE_NUMBER = 12345
        # CLIENT_CERTIFICATE_FILE = 'api-client.pem'
        # KEY_STORE_PASSWORD = "changeme"
        # DEFAULT_TIMEOUT_MS = 5000
    
    class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
        def __init__(self):
            self._publish_count = 0
    
        @property
        def get_publish_count(self):
            return self._publish_count
    
        def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
            with lock:
                self._publish_count += 1
                print(f"\tMessage: {publish_receipt.message}\n"
                      f"\tIs persisted: {publish_receipt.is_persisted}\n"
                      f"\tTimestamp: {publish_receipt.time_stamp}\n"
                      f"\tException: {publish_receipt.exception}\n")
                if publish_receipt.user_context:
                    print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
    
    
    
    class HowToDirectPublishMessage:
        def direct_message_publish(messaging_service: MessagingService, destination, message):
            """ to publish str or byte array type message"""
    
            try:
                direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \
                    on_back_pressure_reject(buffer_capacity=0).build()
                pub_start = direct_publish_service.start_async()
                pub_start.result()
                direct_publish_service.publish(destination=destination, message=message)
            finally:
                direct_publish_service.terminate()
    
    
        def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
            """method to create, build and start persistent publisher"""
            publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
            publisher.start_async()
            print('PERSISTENT publisher started')
            return publisher
    
        def publish_string_message_non_blocking(message_publisher: PersistentMessagePublisher, destination: Topic, message):
            """method to publish string message using persistent message publisher, non-blocking"""
            publish_receipt_listener = MessagePublishReceiptListenerImpl()
            message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
            message_publisher.publish(message, destination)
            print(f'PERSISTENT publish message is successful... Topic: [{destination.get_name()}]')
            time.sleep(2)
            print(f'Publish receipt count: {publish_receipt_listener.get_publish_count}\n')
    
    
    constants = SolaceConstants
    
    broker_props = {
      "solace.messaging.transport.host": "tcps://**xxxx**.messaging.solace.cloud:55443",
      "solace.messaging.service.vpn-name": "test-service",
      "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
      "solace.messaging.authentication.scheme.basic.password": "**xxxx**",
      }
    
    transport_security = TLS.create() \
      .with_certificate_validation(True, validate_server_name=False,
            trust_store_file_path="./pem/")
    
    messaging_service = MessagingService.builder().from_properties(broker_props)\
      .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
      .with_transport_security_strategy(transport_security).build() 
    
    
    
    
    try:
        messaging_service.connect()
        print(f'Message service is connected? {messaging_service.is_connected}')
    
        topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
    
        #print("Execute Direct Publish - String")
        #HowToDirectPublishMessage.direct_message_publish(messaging_service, topic, constants.MESSAGE_TO_SEND)
    
        publisher = HowToDirectPublishMessage.create_persistent_message_publisher(messaging_service)
        HowToDirectPublishMessage.publish_string_message_non_blocking(publisher, topic, constants.MESSAGE_TO_SEND)
    
    finally:
        publisher.terminate()
        messaging_service.disconnect()
    

    Up running:

    Thank you for your support :smile:

  • stushep
    stushep Member Posts: 20

    FYI, the exact setup i am trying to mimic in python:

  • stushep
    stushep Member Posts: 20
    edited January 2022 #7

    Have resolved the publisher error as was missing the lock = threading.Lock() variable, therefore the lock was skipped during the for loop...

    However my message is still only acting as a direct message, not guaranteed and still need to resolve this.

    Thank you.

  • amackenzie
    amackenzie Member, Employee Posts: 268 Solace Employee
    #8 Answer ✓

    You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.

  • stushep
    stushep Member Posts: 20

    Hi @amackenzie , perfect, that's the missing piece of the jigsaw, thank you very much :smile::smile:

    For ref, current working code:

    from solace.messaging.messaging_service import MessagingService,RetryStrategy
    from solace.messaging.config.transport_security_strategy import TLS
    from solace.messaging.messaging_service import MessagingService
    from solace.messaging.resources.topic import Topic
    from solace.messaging.utils.manageable import Metric
    from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
    import time,threading
    from datetime import datetime
    from string import Template
    
    lock = threading.Lock()
    
    class SolaceConstants:
        TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
        MESSAGE_TO_SEND = "Test Message: "
    
    class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
        def __init__(self):
            self._publish_count = 0
        @property
        def get_publish_count(self):
            return self._publish_count
    
        def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
            with lock:
                self._publish_count += 1
                print(f"\tMessage: {publish_receipt.message}\n"
                    f"\tIs persisted: {publish_receipt.is_persisted}\n"
                    f"\tTimestamp: {publish_receipt.time_stamp}\n"
                    f"\tException: {publish_receipt.exception}\n")
                if publish_receipt.user_context:
                    print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
    
    class ClsMessagePublisher:
        def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
            """method to create, build and start persistent publisher"""
            publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
            publisher.start_async()
            print('PERSISTENT publisher started')
            return publisher
    
        def publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service: MessagingService,
                                                                            message_publisher: PersistentMessagePublisher,
                                                                            destination: Topic, message, time_out):
            """method to publish message using persistent message publisher using blocking publish, i.e
            publish_await_acknowledgement and wait for the publisher confirmation"""
            publish_receipt_listener = MessagePublishReceiptListenerImpl()
            message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
            message_publisher.publish_await_acknowledgement(message, destination, time_out)
            time.sleep(2)
            metrics = messaging_service.metrics()
            print(f'Published message count: {metrics.get_value(Metric.PERSISTENT_MESSAGES_SENT)}\n')
    
    constants = SolaceConstants
    
    broker_props = {
    "solace.messaging.transport.host": "tcps://mr80jp5da7c9s.messaging.solace.cloud:55443",
    "solace.messaging.service.vpn-name": "test-service",
    "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
    "solace.messaging.authentication.scheme.basic.password": "3d1njv3rmkf6fa60esu6hpv6nq",
    }
    
    ## Pem key stored in sub folder 'pem'
    transport_security = TLS.create().with_certificate_validation(True, validate_server_name=False,trust_store_file_path="./pem/")
    
    messaging_service = MessagingService.builder().from_properties(broker_props)\
    .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
    .with_transport_security_strategy(transport_security).build() 
    
    try:
        messaging_service.connect()
        print(f'Message service is connected? {messaging_service.is_connected}')
        topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
        message = constants.MESSAGE_TO_SEND 
    
        ## Declare a publisher
        publisher = ClsMessagePublisher.create_persistent_message_publisher(messaging_service)
        ## Send message (Message + Datetime for testing)
        ClsMessagePublisher.publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service=messaging_service,
                                                                                    message_publisher=publisher,
                                                                                    destination=topic,
                                                                                    message=message+str(datetime.now()),
                                                                                    time_out=2000)
    except Exception as e:
        print("Error occurred:", e)
    finally:
        try:
            publisher.terminate()
            messaging_service.disconnect()
        except Exception as e1:
            print("Error occurred closing:", e1)