Good morning @amackenzie , Thank you for the quick reply again.
I understand the concepts and how a more advanced setup should be done within Solace… have run through many vid’s on youtube about the taxi app etc.
For my use case however i am wishing to have the simple message sent to a queue which sits awaiting a poll…
Using the more advanced Try-Me tool within the web app under manage, this is possible… within the left-hand pane I select the query radio button, send a message… then within the right-hand pane i connect and then subscribe to my queue and the messages are retrieved…
I have utilised the sample code for sending a publisher non_blocking string and unfortunately it is still sending a message to a topic not the queue. If i am connected to the broker i receive the message… if i am not, it is lost.
Please see full code:
from solace.messaging.messaging_service import MessagingService,RetryStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.config import _sol_constants
from solace.messaging.messaging_service import MessagingService
from solace.messaging.resources.topic import Topic
from solace.messaging.utils.converter import ObjectToBytes
from solace.messaging.utils.manageable import Metric
from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
import time
from string import Template
class SolaceConstants:
"""this class contains all the constants used through out the project"""
TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
# QUEUE_NAME_FORMAT = Template('Q/$iteration')
# TOPIC_ENDPOINT_1 = "try-me1"
# TOPIC_ENDPOINT_2 = "try-me2"
# APPLICATION_MESSAGE_ID = "Solace-App"
MESSAGE_TO_SEND = "hello Solace...123"
# ENCODING_TYPE = "utf-8"
# CUSTOM_PROPS = {"language": "en-CA", "isEncrypted": "True"}
# TOPIC_ENDPOINT = "purchase/tickets"
# GROUP_NAME1 = "analytics"
# GROUP_NAME2 = "booking"
# MESSAGE_PRIORITY = 1
# MESSAGE_EXPIRATION = 5000
# MESSAGE_SEQUENCE_NUMBER = 12345
# CLIENT_CERTIFICATE_FILE = 'api-client.pem'
# KEY_STORE_PASSWORD = "changeme"
# DEFAULT_TIMEOUT_MS = 5000
class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
def __init__(self):
self._publish_count = 0
@property
def get_publish_count(self):
return self._publish_count
def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
with lock:
self._publish_count += 1
print(f"\tMessage: {publish_receipt.message}\n"
f"\tIs persisted: {publish_receipt.is_persisted}\n"
f"\tTimestamp: {publish_receipt.time_stamp}\n"
f"\tException: {publish_receipt.exception}\n")
if publish_receipt.user_context:
print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
class HowToDirectPublishMessage:
def direct_message_publish(messaging_service: MessagingService, destination, message):
""" to publish str or byte array type message"""
try:
direct_publish_service = messaging_service.create_direct_message_publisher_builder(). \
on_back_pressure_reject(buffer_capacity=0).build()
pub_start = direct_publish_service.start_async()
pub_start.result()
direct_publish_service.publish(destination=destination, message=message)
finally:
direct_publish_service.terminate()
def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
"""method to create, build and start persistent publisher"""
publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
publisher.start_async()
print('PERSISTENT publisher started')
return publisher
def publish_string_message_non_blocking(message_publisher: PersistentMessagePublisher, destination: Topic, message):
"""method to publish string message using persistent message publisher, non-blocking"""
publish_receipt_listener = MessagePublishReceiptListenerImpl()
message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
message_publisher.publish(message, destination)
print(f'PERSISTENT publish message is successful... Topic: [{destination.get_name()}]')
time.sleep(2)
print(f'Publish receipt count: {publish_receipt_listener.get_publish_count}\n')
constants = SolaceConstants
broker_props = {
"solace.messaging.transport.host": "tcps://**xxxx**.messaging.solace.cloud:55443",
"solace.messaging.service.vpn-name": "test-service",
"solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
"solace.messaging.authentication.scheme.basic.password": "**xxxx**",
}
transport_security = TLS.create() \
.with_certificate_validation(True, validate_server_name=False,
trust_store_file_path="./pem/")
messaging_service = MessagingService.builder().from_properties(broker_props)\
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
.with_transport_security_strategy(transport_security).build()
try:
messaging_service.connect()
print(f'Message service is connected? {messaging_service.is_connected}')
topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
#print("Execute Direct Publish - String")
#HowToDirectPublishMessage.direct_message_publish(messaging_service, topic, constants.MESSAGE_TO_SEND)
publisher = HowToDirectPublishMessage.create_persistent_message_publisher(messaging_service)
HowToDirectPublishMessage.publish_string_message_non_blocking(publisher, topic, constants.MESSAGE_TO_SEND)
finally:
publisher.terminate()
messaging_service.disconnect()
Up running:
Thank you for your support 