Python and Publishing to Queue using solace's API
Good morning all,
Continuing my Solace journey today and after a lot of reading, there is no where that i can find which details how to publish to an actual queue from the python API.
I have seen a single example which is using a different SEMP API, but it doesnt make sense to me to utilise two different connection methodologies so pursuing the Solace API itself.
After yesterdays faff of connecting to the cloud, I can now utilise the demo's from your git repository with 'relative' ease, modifying the SolaceConstants... However none of the demo's show publishing to a queue where it will sit and await the subscriber. - They all just talk to a topic which is non-persistant.
i.e.
send message -> no subscriber -> bin it
I am looking for the settings as shown below:
send message -> hold in queue -> subscribe and get message
Which API call/demo do you have showing the send message part of the API ?
For current settings:
- I have a queue setup matching your try-me called 'try-me-queue'
The class currently being used from your demo to send to a Topic (doesnt seem to work with queue)
class HowToDirectPublishMessage: def direct_message_publish(messaging_service: MessagingService, destination, message): """ to publish str or byte array type message""" try: direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \ on_back_pressure_reject(buffer_capacity=0).build() pub_start = direct_publish_service.start_async() pub_start.result() direct_publish_service.publish(destination=destination, message=message) finally: direct_publish_service.terminate()
This is called by the main func:
destination_name = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT) HowToDirectPublishMessage.direct_message_publish(messaging_service, destination_name, constants.MESSAGE_TO_SEND)
Is there maybe a Queue.of('My Queue Name Here') option which is not obvious?
Thank you.
Best Answer
-
You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.
1
Answers
-
It's important to know the core messaging concepts of PubSub+ in order to know which APIs to apply to your use cases.
https://docs.solace.com/Basics/Core-Concepts.htm
While you can publish directly to a queue, it is generally a best practice to publish to a topic as a topic can be subscribed to by 1 or more queues making it a true pub/sub and offering much more flexibility.
And you can publish to a topic in a persistent way (where you are using the guaranteed messaging facilities) using a persistent publisher.
This is a sampler of various permutations of a persistent publisher.
I hope that helps.
0 -
Good morning @amackenzie , Thank you for the quick reply again.
I understand the concepts and how a more advanced setup should be done within Solace... have run through many vid's on youtube about the taxi app etc.
For my use case however i am wishing to have the simple message sent to a queue which sits awaiting a poll....
Using the more advanced Try-Me tool within the web app under manage, this is possible... within the left-hand pane I select the query radio button, send a message... then within the right-hand pane i connect and then subscribe to my queue and the messages are retrieved....
I have utilised the sample code for sending a publisher non_blocking string and unfortunately it is still sending a message to a topic not the queue. If i am connected to the broker i receive the message.. if i am not, it is lost.
Please see full code:
from solace.messaging.messaging_service import MessagingService,RetryStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.config import _sol_constants from solace.messaging.messaging_service import MessagingService from solace.messaging.resources.topic import Topic from solace.messaging.utils.converter import ObjectToBytes from solace.messaging.utils.manageable import Metric from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener import time from string import Template class SolaceConstants: """this class contains all the constants used through out the project""" TOPIC_ENDPOINT_DEFAULT = "try-me-queue" # QUEUE_NAME_FORMAT = Template('Q/$iteration') # TOPIC_ENDPOINT_1 = "try-me1" # TOPIC_ENDPOINT_2 = "try-me2" # APPLICATION_MESSAGE_ID = "Solace-App" MESSAGE_TO_SEND = "hello Solace...123" # ENCODING_TYPE = "utf-8" # CUSTOM_PROPS = {"language": "en-CA", "isEncrypted": "True"} # TOPIC_ENDPOINT = "purchase/tickets" # GROUP_NAME1 = "analytics" # GROUP_NAME2 = "booking" # MESSAGE_PRIORITY = 1 # MESSAGE_EXPIRATION = 5000 # MESSAGE_SEQUENCE_NUMBER = 12345 # CLIENT_CERTIFICATE_FILE = 'api-client.pem' # KEY_STORE_PASSWORD = "changeme" # DEFAULT_TIMEOUT_MS = 5000 class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener): def __init__(self): self._publish_count = 0 @property def get_publish_count(self): return self._publish_count def on_publish_receipt(self, publish_receipt: 'PublishReceipt'): with lock: self._publish_count += 1 print(f"\tMessage: {publish_receipt.message}\n" f"\tIs persisted: {publish_receipt.is_persisted}\n" f"\tTimestamp: {publish_receipt.time_stamp}\n" f"\tException: {publish_receipt.exception}\n") if publish_receipt.user_context: print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}') class HowToDirectPublishMessage: def direct_message_publish(messaging_service: MessagingService, destination, message): """ to publish str or byte array type message""" try: direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \ on_back_pressure_reject(buffer_capacity=0).build() pub_start = direct_publish_service.start_async() pub_start.result() direct_publish_service.publish(destination=destination, message=message) finally: direct_publish_service.terminate() def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher': """method to create, build and start persistent publisher""" publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build() publisher.start_async() print('PERSISTENT publisher started') return publisher def publish_string_message_non_blocking(message_publisher: PersistentMessagePublisher, destination: Topic, message): """method to publish string message using persistent message publisher, non-blocking""" publish_receipt_listener = MessagePublishReceiptListenerImpl() message_publisher.set_message_publish_receipt_listener(publish_receipt_listener) message_publisher.publish(message, destination) print(f'PERSISTENT publish message is successful... Topic: [{destination.get_name()}]') time.sleep(2) print(f'Publish receipt count: {publish_receipt_listener.get_publish_count}\n') constants = SolaceConstants broker_props = { "solace.messaging.transport.host": "tcps://**xxxx**.messaging.solace.cloud:55443", "solace.messaging.service.vpn-name": "test-service", "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client", "solace.messaging.authentication.scheme.basic.password": "**xxxx**", } transport_security = TLS.create() \ .with_certificate_validation(True, validate_server_name=False, trust_store_file_path="./pem/") messaging_service = MessagingService.builder().from_properties(broker_props)\ .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\ .with_transport_security_strategy(transport_security).build() try: messaging_service.connect() print(f'Message service is connected? {messaging_service.is_connected}') topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT) #print("Execute Direct Publish - String") #HowToDirectPublishMessage.direct_message_publish(messaging_service, topic, constants.MESSAGE_TO_SEND) publisher = HowToDirectPublishMessage.create_persistent_message_publisher(messaging_service) HowToDirectPublishMessage.publish_string_message_non_blocking(publisher, topic, constants.MESSAGE_TO_SEND) finally: publisher.terminate() messaging_service.disconnect()
Up running:
Thank you for your support
0 -
Have resolved the publisher error as was missing the lock = threading.Lock() variable, therefore the lock was skipped during the for loop...
However my message is still only acting as a direct message, not guaranteed and still need to resolve this.
Thank you.
0 -
You need the queue to subscribe to that topic. In PubSub+ Manager, go to the queue and select the "subscriptions" tab and add the topic string (can also include wildcards) so that matching topics published to the broker are "attracted" to that queue.
1 -
Hi @amackenzie , perfect, that's the missing piece of the jigsaw, thank you very much
For ref, current working code:
from solace.messaging.messaging_service import MessagingService,RetryStrategy from solace.messaging.config.transport_security_strategy import TLS from solace.messaging.messaging_service import MessagingService from solace.messaging.resources.topic import Topic from solace.messaging.utils.manageable import Metric from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener import time,threading from datetime import datetime from string import Template lock = threading.Lock() class SolaceConstants: TOPIC_ENDPOINT_DEFAULT = "try-me-queue" MESSAGE_TO_SEND = "Test Message: " class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener): def __init__(self): self._publish_count = 0 @property def get_publish_count(self): return self._publish_count def on_publish_receipt(self, publish_receipt: 'PublishReceipt'): with lock: self._publish_count += 1 print(f"\tMessage: {publish_receipt.message}\n" f"\tIs persisted: {publish_receipt.is_persisted}\n" f"\tTimestamp: {publish_receipt.time_stamp}\n" f"\tException: {publish_receipt.exception}\n") if publish_receipt.user_context: print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}') class ClsMessagePublisher: def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher': """method to create, build and start persistent publisher""" publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build() publisher.start_async() print('PERSISTENT publisher started') return publisher def publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service: MessagingService, message_publisher: PersistentMessagePublisher, destination: Topic, message, time_out): """method to publish message using persistent message publisher using blocking publish, i.e publish_await_acknowledgement and wait for the publisher confirmation""" publish_receipt_listener = MessagePublishReceiptListenerImpl() message_publisher.set_message_publish_receipt_listener(publish_receipt_listener) message_publisher.publish_await_acknowledgement(message, destination, time_out) time.sleep(2) metrics = messaging_service.metrics() print(f'Published message count: {metrics.get_value(Metric.PERSISTENT_MESSAGES_SENT)}\n') constants = SolaceConstants broker_props = { "solace.messaging.transport.host": "tcps://mr80jp5da7c9s.messaging.solace.cloud:55443", "solace.messaging.service.vpn-name": "test-service", "solace.messaging.authentication.scheme.basic.username": "solace-cloud-client", "solace.messaging.authentication.scheme.basic.password": "3d1njv3rmkf6fa60esu6hpv6nq", } ## Pem key stored in sub folder 'pem' transport_security = TLS.create().with_certificate_validation(True, validate_server_name=False,trust_store_file_path="./pem/") messaging_service = MessagingService.builder().from_properties(broker_props)\ .with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\ .with_transport_security_strategy(transport_security).build() try: messaging_service.connect() print(f'Message service is connected? {messaging_service.is_connected}') topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT) message = constants.MESSAGE_TO_SEND ## Declare a publisher publisher = ClsMessagePublisher.create_persistent_message_publisher(messaging_service) ## Send message (Message + Datetime for testing) ClsMessagePublisher.publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service=messaging_service, message_publisher=publisher, destination=topic, message=message+str(datetime.now()), time_out=2000) except Exception as e: print("Error occurred:", e) finally: try: publisher.terminate() messaging_service.disconnect() except Exception as e1: print("Error occurred closing:", e1)
0