Python and Publishing to Queue using solace's API

Good morning all,

Continuing my Solace journey today and after a lot of reading, there is no where that i can find which details how to publish to an actual queue from the python API.

I have seen a single example which is using a different SEMP API, but it doesnt make sense to me to utilise two different connection methodologies so pursuing the Solace API itself.

After yesterdays faff of connecting to the cloud, I can now utilise the demo’s from your git repository with ‘relative’ ease, modifying the SolaceConstants… However none of the demo’s show publishing to a queue where it will sit and await the subscriber. - They all just talk to a topic which is non-persistant.

i.e.

send messageno subscriberbin it

I am looking for the settings as shown below:

send messagehold in queuesubscribe and get message

Which API call/demo do you have showing the send message part of the API ?

For current settings:

  • I have a queue setup matching your try-me called ‘try-me-queue’

  • The class currently being used from your demo to send to a Topic (doesnt seem to work with queue)

      class HowToDirectPublishMessage:
              def direct_message_publish(messaging_service: MessagingService, destination, message):
              """ to publish str or byte array type message"""
    
              try:
                  direct_publish_service =         messaging_service.create_direct_message_publisher_builder(). \
                      on_back_pressure_reject(buffer_capacity=0).build()
                  pub_start = direct_publish_service.start_async()
                  pub_start.result()
                  direct_publish_service.publish(destination=destination, message=message)
              finally:
                  direct_publish_service.terminate()
    
  • This is called by the main func:

      destination_name = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
      HowToDirectPublishMessage.direct_message_publish(messaging_service, destination_name, constants.MESSAGE_TO_SEND)
    
  • Is there maybe a Queue.of(‘My Queue Name Here’) option which is not obvious?

Thank you.

Have re-asked Q under API broker… managed to ask in General which is incorrect area - unable to see a delete post option

It’s important to know the core messaging concepts of PubSub+ in order to know which APIs to apply to your use cases.

While you can publish directly to a queue, it is generally a best practice to publish to a topic as a topic can be subscribed to by 1 or more queues making it a true pub/sub and offering much more flexibility.

And you can publish to a topic in a persistent way (where you are using the guaranteed messaging facilities) using a persistent publisher.

This is a sampler of various permutations of a persistent publisher.

I hope that helps.

Good morning @amackenzie , Thank you for the quick reply again.

I understand the concepts and how a more advanced setup should be done within Solace… have run through many vid’s on youtube about the taxi app etc.

For my use case however i am wishing to have the simple message sent to a queue which sits awaiting a poll…

Using the more advanced Try-Me tool within the web app under manage, this is possible… within the left-hand pane I select the query radio button, send a message… then within the right-hand pane i connect and then subscribe to my queue and the messages are retrieved…

I have utilised the sample code for sending a publisher non_blocking string and unfortunately it is still sending a message to a topic not the queue. If i am connected to the broker i receive the message… if i am not, it is lost.

Please see full code:

from solace.messaging.messaging_service import MessagingService,RetryStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.config import _sol_constants
from solace.messaging.messaging_service import MessagingService
from solace.messaging.resources.topic import Topic
from solace.messaging.utils.converter import ObjectToBytes
from solace.messaging.utils.manageable import Metric

from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
import time
from string import Template

class SolaceConstants:
	"""this class contains all the constants used through out the project"""
	TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
	# QUEUE_NAME_FORMAT = Template('Q/$iteration')
	# TOPIC_ENDPOINT_1 = "try-me1"
	# TOPIC_ENDPOINT_2 = "try-me2"
	# APPLICATION_MESSAGE_ID = "Solace-App"
	MESSAGE_TO_SEND = "hello Solace...123"
	# ENCODING_TYPE = "utf-8"
	# CUSTOM_PROPS =  {"language": "en-CA", "isEncrypted": "True"}
	# TOPIC_ENDPOINT = "purchase/tickets"
	# GROUP_NAME1 = "analytics"
	# GROUP_NAME2 = "booking"
	# MESSAGE_PRIORITY = 1
	# MESSAGE_EXPIRATION = 5000
	# MESSAGE_SEQUENCE_NUMBER = 12345
	# CLIENT_CERTIFICATE_FILE = 'api-client.pem'
	# KEY_STORE_PASSWORD = "changeme"
	# DEFAULT_TIMEOUT_MS = 5000

class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
	def __init__(self):
		self._publish_count = 0

	@property
	def get_publish_count(self):
		return self._publish_count

	def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
		with lock:
			self._publish_count += 1
			print(f"\tMessage: {publish_receipt.message}\n"
				  f"\tIs persisted: {publish_receipt.is_persisted}\n"
				  f"\tTimestamp: {publish_receipt.time_stamp}\n"
				  f"\tException: {publish_receipt.exception}\n")
			if publish_receipt.user_context:
				print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')



class HowToDirectPublishMessage:
	def direct_message_publish(messaging_service: MessagingService, destination, message):
		""" to publish str or byte array type message"""

		try:
			direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \
				on_back_pressure_reject(buffer_capacity=0).build()
			pub_start = direct_publish_service.start_async()
			pub_start.result()
			direct_publish_service.publish(destination=destination, message=message)
		finally:
			direct_publish_service.terminate()


	def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
		"""method to create, build and start persistent publisher"""
		publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
		publisher.start_async()
		print('PERSISTENT publisher started')
		return publisher

	def publish_string_message_non_blocking(message_publisher: PersistentMessagePublisher, destination: Topic, message):
		"""method to publish string message using persistent message publisher, non-blocking"""
		publish_receipt_listener = MessagePublishReceiptListenerImpl()
		message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
		message_publisher.publish(message, destination)
		print(f'PERSISTENT publish message is successful... Topic: [{destination.get_name()}]')
		time.sleep(2)
		print(f'Publish receipt count: {publish_receipt_listener.get_publish_count}\n')


constants = SolaceConstants

broker_props = {
  "solace.messaging.transport.host": "tcps://**xxxx**.messaging.solace.cloud:55443",
  "solace.messaging.service.vpn-name": "test-service",
  "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
  "solace.messaging.authentication.scheme.basic.password": "**xxxx**",
  }

transport_security = TLS.create() \
  .with_certificate_validation(True, validate_server_name=False,
		trust_store_file_path="./pem/")

messaging_service = MessagingService.builder().from_properties(broker_props)\
  .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
  .with_transport_security_strategy(transport_security).build() 




try:
	messaging_service.connect()
	print(f'Message service is connected? {messaging_service.is_connected}')
			
	topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)

	#print("Execute Direct Publish - String")
	#HowToDirectPublishMessage.direct_message_publish(messaging_service, topic, constants.MESSAGE_TO_SEND)
	
	publisher = HowToDirectPublishMessage.create_persistent_message_publisher(messaging_service)
	HowToDirectPublishMessage.publish_string_message_non_blocking(publisher, topic, constants.MESSAGE_TO_SEND)
	
finally:
	publisher.terminate()
	messaging_service.disconnect()

Up running:

Thank you for your support :smile:

FYI, the exact setup i am trying to mimic in python:

Have resolved the publisher error as was missing the lock = threading.Lock() variable, therefore the lock was skipped during the for loop…

However my message is still only acting as a direct message, not guaranteed and still need to resolve this.

Thank you.

You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the “subscriptions” tab and add the topic string (can also include wildcards) so that matching topics published to the broker are “attracted” to that queue.

Hi @amackenzie , perfect, that’s the missing piece of the jigsaw, thank you very much :smile: :smile:

For ref, current working code:

from solace.messaging.messaging_service import MessagingService,RetryStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.messaging_service import MessagingService
from solace.messaging.resources.topic import Topic
from solace.messaging.utils.manageable import Metric
from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
import time,threading
from datetime import datetime
from string import Template

lock = threading.Lock()

class SolaceConstants:
    TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
    MESSAGE_TO_SEND = "Test Message: "

class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
    def __init__(self):
        self._publish_count = 0
    @property
    def get_publish_count(self):
        return self._publish_count

    def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
        with lock:
            self._publish_count += 1
            print(f"\tMessage: {publish_receipt.message}\n"
                f"\tIs persisted: {publish_receipt.is_persisted}\n"
                f"\tTimestamp: {publish_receipt.time_stamp}\n"
                f"\tException: {publish_receipt.exception}\n")
            if publish_receipt.user_context:
                print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')

class ClsMessagePublisher:
    def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
        """method to create, build and start persistent publisher"""
        publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
        publisher.start_async()
        print('PERSISTENT publisher started')
        return publisher

    def publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service: MessagingService,
                                                                        message_publisher: PersistentMessagePublisher,
                                                                        destination: Topic, message, time_out):
        """method to publish message using persistent message publisher using blocking publish, i.e
        publish_await_acknowledgement and wait for the publisher confirmation"""
        publish_receipt_listener = MessagePublishReceiptListenerImpl()
        message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
        message_publisher.publish_await_acknowledgement(message, destination, time_out)
        time.sleep(2)
        metrics = messaging_service.metrics()
        print(f'Published message count: {metrics.get_value(Metric.PERSISTENT_MESSAGES_SENT)}\n')
    
constants = SolaceConstants

broker_props = {
"solace.messaging.transport.host": "tcps://mr80jp5da7c9s.messaging.solace.cloud:55443",
"solace.messaging.service.vpn-name": "test-service",
"solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
"solace.messaging.authentication.scheme.basic.password": "3d1njv3rmkf6fa60esu6hpv6nq",
}

## Pem key stored in sub folder 'pem'
transport_security = TLS.create().with_certificate_validation(True, validate_server_name=False,trust_store_file_path="./pem/")

messaging_service = MessagingService.builder().from_properties(broker_props)\
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
.with_transport_security_strategy(transport_security).build() 

try:
    messaging_service.connect()
    print(f'Message service is connected? {messaging_service.is_connected}')
    topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
    message = constants.MESSAGE_TO_SEND 

    ## Declare a publisher
    publisher = ClsMessagePublisher.create_persistent_message_publisher(messaging_service)
    ## Send message (Message + Datetime for testing)
    ClsMessagePublisher.publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service=messaging_service,
                                                                                message_publisher=publisher,
                                                                                destination=topic,
                                                                                message=message+str(datetime.now()),
                                                                                time_out=2000)
except Exception as e:
    print("Error occurred:", e)
finally:
    try:
        publisher.terminate()
        messaging_service.disconnect()
    except Exception as e1:
        print("Error occurred closing:", e1)