Solace Community is getting a facelift!

On March 3rd we will be starting the process of migrating Solace Community to a new platform. As a result, Solace Community will go in to a temporary read-only state. You will still be able to come onto Solace Community and search through posts to find answers, but you won't be able to ask questions, post comments, or react in any way.

We hope to have the migration complete by Wednesday March 5th (or sooner), so please keep an eye out!

Python and Publishing to Queue using solace's API

stushep
stushep Member Posts: 20

Good morning all,

Continuing my Solace journey today and after a lot of reading, there is no where that i can find which details how to publish to an actual queue from the python API.

I have seen a single example which is using a different SEMP API, but it doesnt make sense to me to utilise two different connection methodologies so pursuing the Solace API itself.

After yesterdays faff of connecting to the cloud, I can now utilise the demo's from your git repository with 'relative' ease, modifying the SolaceConstants... However none of the demo's show publishing to a queue where it will sit and await the subscriber. - They all just talk to a topic which is non-persistant.

i.e.

send message -> no subscriber -> bin it

I am looking for the settings as shown below:

send message -> hold in queue -> subscribe and get message

Which API call/demo do you have showing the send message part of the API ?

For current settings:

  • I have a queue setup matching your try-me called 'try-me-queue'
  • The class currently being used from your demo to send to a Topic (doesnt seem to work with queue)

    class HowToDirectPublishMessage:
            def direct_message_publish(messaging_service: MessagingService, destination, message):
            """ to publish str or byte array type message"""
    
            try:
                direct_publish_service =         messaging_service.create_direct_message_publisher_builder(). \
                    on_back_pressure_reject(buffer_capacity=0).build()
                pub_start = direct_publish_service.start_async()
                pub_start.result()
                direct_publish_service.publish(destination=destination, message=message)
            finally:
                direct_publish_service.terminate()
    
  • This is called by the main func:

    destination_name = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
    HowToDirectPublishMessage.direct_message_publish(messaging_service, destination_name, constants.MESSAGE_TO_SEND)
    
  • Is there maybe a Queue.of('My Queue Name Here') option which is not obvious?

Thank you.

Best Answer

  • amackenzie
    amackenzie Member, Employee Posts: 270 Solace Employee
    #2 Answer ✓

    You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.

Answers

  • stushep
    stushep Member Posts: 20

    Have re-asked Q under API broker... managed to ask in General which is incorrect area - unable to see a delete post option

  • amackenzie
    amackenzie Member, Employee Posts: 270 Solace Employee
    edited January 2022 #4

    It's important to know the core messaging concepts of PubSub+ in order to know which APIs to apply to your use cases.

    https://docs.solace.com/Basics/Core-Concepts.htm

    While you can publish directly to a queue, it is generally a best practice to publish to a topic as a topic can be subscribed to by 1 or more queues making it a true pub/sub and offering much more flexibility.

    And you can publish to a topic in a persistent way (where you are using the guaranteed messaging facilities) using a persistent publisher.

    https://github.com/SolaceSamples/solace-samples-python/blob/master/howtos/pubsub/how_to_publish_persistent_message.py

    This is a sampler of various permutations of a persistent publisher.

    I hope that helps.

  • stushep
    stushep Member Posts: 20

    Good morning @amackenzie , Thank you for the quick reply again.

    I understand the concepts and how a more advanced setup should be done within Solace... have run through many vid's on youtube about the taxi app etc.

    For my use case however i am wishing to have the simple message sent to a queue which sits awaiting a poll....

    Using the more advanced Try-Me tool within the web app under manage, this is possible... within the left-hand pane I select the query radio button, send a message... then within the right-hand pane i connect and then subscribe to my queue and the messages are retrieved....

    I have utilised the sample code for sending a publisher non_blocking string and unfortunately it is still sending a message to a topic not the queue. If i am connected to the broker i receive the message.. if i am not, it is lost.

    Please see full code:

    from solace.messaging.messaging_service import MessagingService,RetryStrategy
    from solace.messaging.config.transport_security_strategy import TLS
    from solace.messaging.config import _sol_constants
    from solace.messaging.messaging_service import MessagingService
    from solace.messaging.resources.topic import Topic
    from solace.messaging.utils.converter import ObjectToBytes
    from solace.messaging.utils.manageable import Metric
    
    from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
    import time
    from string import Template
    
    class SolaceConstants:
        """this class contains all the constants used through out the project"""
        TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
        # QUEUE_NAME_FORMAT = Template('Q/$iteration')
        # TOPIC_ENDPOINT_1 = "try-me1"
        # TOPIC_ENDPOINT_2 = "try-me2"
        # APPLICATION_MESSAGE_ID = "Solace-App"
        MESSAGE_TO_SEND = "hello Solace...123"
        # ENCODING_TYPE = "utf-8"
        # CUSTOM_PROPS =  {"language": "en-CA", "isEncrypted": "True"}
        # TOPIC_ENDPOINT = "purchase/tickets"
        # GROUP_NAME1 = "analytics"
        # GROUP_NAME2 = "booking"
        # MESSAGE_PRIORITY = 1
        # MESSAGE_EXPIRATION = 5000
        # MESSAGE_SEQUENCE_NUMBER = 12345
        # CLIENT_CERTIFICATE_FILE = 'api-client.pem'
        # KEY_STORE_PASSWORD = "changeme"
        # DEFAULT_TIMEOUT_MS = 5000
    
    class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
        def __init__(self):
            self._publish_count = 0
    
        @property
        def get_publish_count(self):
            return self._publish_count
    
        def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
            with lock:
                self._publish_count += 1
                print(f"\tMessage: {publish_receipt.message}\n"
                      f"\tIs persisted: {publish_receipt.is_persisted}\n"
                      f"\tTimestamp: {publish_receipt.time_stamp}\n"
                      f"\tException: {publish_receipt.exception}\n")
                if publish_receipt.user_context:
                    print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
    
    
    
    class HowToDirectPublishMessage:
        def direct_message_publish(messaging_service: MessagingService, destination, message):
            """ to publish str or byte array type message"""
    
            try:
                direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \
                    on_back_pressure_reject(buffer_capacity=0).build()
                pub_start = direct_publish_service.start_async()
                pub_start.result()
                direct_publish_service.publish(destination=destination, message=message)
            finally:
                direct_publish_service.terminate()
    
    
        def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
            """method to create, build and start persistent publisher"""
            publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
            publisher.start_async()
            print('PERSISTENT publisher started')
            return publisher
    
        def publish_string_message_non_blocking(message_publisher: PersistentMessagePublisher, destination: Topic, message):
            """method to publish string message using persistent message publisher, non-blocking"""
            publish_receipt_listener = MessagePublishReceiptListenerImpl()
            message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
            message_publisher.publish(message, destination)
            print(f'PERSISTENT publish message is successful... Topic: [{destination.get_name()}]')
            time.sleep(2)
            print(f'Publish receipt count: {publish_receipt_listener.get_publish_count}\n')
    
    
    constants = SolaceConstants
    
    broker_props = {
      "solace.messaging.transport.host": "tcps://**xxxx**.messaging.solace.cloud:55443",
      "solace.messaging.service.vpn-name": "test-service",
      "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
      "solace.messaging.authentication.scheme.basic.password": "**xxxx**",
      }
    
    transport_security = TLS.create() \
      .with_certificate_validation(True, validate_server_name=False,
            trust_store_file_path="./pem/")
    
    messaging_service = MessagingService.builder().from_properties(broker_props)\
      .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
      .with_transport_security_strategy(transport_security).build() 
    
    
    
    
    try:
        messaging_service.connect()
        print(f'Message service is connected? {messaging_service.is_connected}')
    
        topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
    
        #print("Execute Direct Publish - String")
        #HowToDirectPublishMessage.direct_message_publish(messaging_service, topic, constants.MESSAGE_TO_SEND)
    
        publisher = HowToDirectPublishMessage.create_persistent_message_publisher(messaging_service)
        HowToDirectPublishMessage.publish_string_message_non_blocking(publisher, topic, constants.MESSAGE_TO_SEND)
    
    finally:
        publisher.terminate()
        messaging_service.disconnect()
    

    Up running:

    Thank you for your support :smile:

  • stushep
    stushep Member Posts: 20

    FYI, the exact setup i am trying to mimic in python:

  • stushep
    stushep Member Posts: 20
    edited January 2022 #7

    Have resolved the publisher error as was missing the lock = threading.Lock() variable, therefore the lock was skipped during the for loop...

    However my message is still only acting as a direct message, not guaranteed and still need to resolve this.

    Thank you.

  • amackenzie
    amackenzie Member, Employee Posts: 270 Solace Employee
    #8 Answer ✓

    You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.

  • stushep
    stushep Member Posts: 20

    Hi @amackenzie , perfect, that's the missing piece of the jigsaw, thank you very much :smile::smile:

    For ref, current working code:

    from solace.messaging.messaging_service import MessagingService,RetryStrategy
    from solace.messaging.config.transport_security_strategy import TLS
    from solace.messaging.messaging_service import MessagingService
    from solace.messaging.resources.topic import Topic
    from solace.messaging.utils.manageable import Metric
    from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
    import time,threading
    from datetime import datetime
    from string import Template
    
    lock = threading.Lock()
    
    class SolaceConstants:
        TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
        MESSAGE_TO_SEND = "Test Message: "
    
    class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
        def __init__(self):
            self._publish_count = 0
        @property
        def get_publish_count(self):
            return self._publish_count
    
        def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
            with lock:
                self._publish_count += 1
                print(f"\tMessage: {publish_receipt.message}\n"
                    f"\tIs persisted: {publish_receipt.is_persisted}\n"
                    f"\tTimestamp: {publish_receipt.time_stamp}\n"
                    f"\tException: {publish_receipt.exception}\n")
                if publish_receipt.user_context:
                    print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
    
    class ClsMessagePublisher:
        def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
            """method to create, build and start persistent publisher"""
            publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
            publisher.start_async()
            print('PERSISTENT publisher started')
            return publisher
    
        def publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service: MessagingService,
                                                                            message_publisher: PersistentMessagePublisher,
                                                                            destination: Topic, message, time_out):
            """method to publish message using persistent message publisher using blocking publish, i.e
            publish_await_acknowledgement and wait for the publisher confirmation"""
            publish_receipt_listener = MessagePublishReceiptListenerImpl()
            message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
            message_publisher.publish_await_acknowledgement(message, destination, time_out)
            time.sleep(2)
            metrics = messaging_service.metrics()
            print(f'Published message count: {metrics.get_value(Metric.PERSISTENT_MESSAGES_SENT)}\n')
    
    constants = SolaceConstants
    
    broker_props = {
    "solace.messaging.transport.host": "tcps://mr80jp5da7c9s.messaging.solace.cloud:55443",
    "solace.messaging.service.vpn-name": "test-service",
    "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
    "solace.messaging.authentication.scheme.basic.password": "3d1njv3rmkf6fa60esu6hpv6nq",
    }
    
    ## Pem key stored in sub folder 'pem'
    transport_security = TLS.create().with_certificate_validation(True, validate_server_name=False,trust_store_file_path="./pem/")
    
    messaging_service = MessagingService.builder().from_properties(broker_props)\
    .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
    .with_transport_security_strategy(transport_security).build() 
    
    try:
        messaging_service.connect()
        print(f'Message service is connected? {messaging_service.is_connected}')
        topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
        message = constants.MESSAGE_TO_SEND 
    
        ## Declare a publisher
        publisher = ClsMessagePublisher.create_persistent_message_publisher(messaging_service)
        ## Send message (Message + Datetime for testing)
        ClsMessagePublisher.publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service=messaging_service,
                                                                                    message_publisher=publisher,
                                                                                    destination=topic,
                                                                                    message=message+str(datetime.now()),
                                                                                    time_out=2000)
    except Exception as e:
        print("Error occurred:", e)
    finally:
        try:
            publisher.terminate()
            messaging_service.disconnect()
        except Exception as e1:
            print("Error occurred closing:", e1)
    

This Month's Leaders

This Week's Leaders