Hi @amackenzie , perfect, that’s the missing piece of the jigsaw, thank you very much
For ref, current working code:
from solace.messaging.messaging_service import MessagingService,RetryStrategy
from solace.messaging.config.transport_security_strategy import TLS
from solace.messaging.messaging_service import MessagingService
from solace.messaging.resources.topic import Topic
from solace.messaging.utils.manageable import Metric
from solace.messaging.publisher.persistent_message_publisher import PersistentMessagePublisher,MessagePublishReceiptListener
import time,threading
from datetime import datetime
from string import Template
lock = threading.Lock()
class SolaceConstants:
TOPIC_ENDPOINT_DEFAULT = "try-me-queue"
MESSAGE_TO_SEND = "Test Message: "
class MessagePublishReceiptListenerImpl(MessagePublishReceiptListener):
def __init__(self):
self._publish_count = 0
@property
def get_publish_count(self):
return self._publish_count
def on_publish_receipt(self, publish_receipt: 'PublishReceipt'):
with lock:
self._publish_count += 1
print(f"\tMessage: {publish_receipt.message}\n"
f"\tIs persisted: {publish_receipt.is_persisted}\n"
f"\tTimestamp: {publish_receipt.time_stamp}\n"
f"\tException: {publish_receipt.exception}\n")
if publish_receipt.user_context:
print(f'\tUsercontext received: {publish_receipt.user_context.get_custom_message}')
class ClsMessagePublisher:
def create_persistent_message_publisher(service: MessagingService) -> 'PersistentMessagePublisher':
"""method to create, build and start persistent publisher"""
publisher: PersistentMessagePublisher = service.create_persistent_message_publisher_builder().build()
publisher.start_async()
print('PERSISTENT publisher started')
return publisher
def publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service: MessagingService,
message_publisher: PersistentMessagePublisher,
destination: Topic, message, time_out):
"""method to publish message using persistent message publisher using blocking publish, i.e
publish_await_acknowledgement and wait for the publisher confirmation"""
publish_receipt_listener = MessagePublishReceiptListenerImpl()
message_publisher.set_message_publish_receipt_listener(publish_receipt_listener)
message_publisher.publish_await_acknowledgement(message, destination, time_out)
time.sleep(2)
metrics = messaging_service.metrics()
print(f'Published message count: {metrics.get_value(Metric.PERSISTENT_MESSAGES_SENT)}\n')
constants = SolaceConstants
broker_props = {
"solace.messaging.transport.host": "tcps://mr80jp5da7c9s.messaging.solace.cloud:55443",
"solace.messaging.service.vpn-name": "test-service",
"solace.messaging.authentication.scheme.basic.username": "solace-cloud-client",
"solace.messaging.authentication.scheme.basic.password": "3d1njv3rmkf6fa60esu6hpv6nq",
}
## Pem key stored in sub folder 'pem'
transport_security = TLS.create().with_certificate_validation(True, validate_server_name=False,trust_store_file_path="./pem/")
messaging_service = MessagingService.builder().from_properties(broker_props)\
.with_reconnection_retry_strategy(RetryStrategy.parametrized_retry(20,3))\
.with_transport_security_strategy(transport_security).build()
try:
messaging_service.connect()
print(f'Message service is connected? {messaging_service.is_connected}')
topic = Topic.of(constants.TOPIC_ENDPOINT_DEFAULT)
message = constants.MESSAGE_TO_SEND
## Declare a publisher
publisher = ClsMessagePublisher.create_persistent_message_publisher(messaging_service)
## Send message (Message + Datetime for testing)
ClsMessagePublisher.publish_byte_message_blocking_waiting_for_publisher_confirmation(messaging_service=messaging_service,
message_publisher=publisher,
destination=topic,
message=message+str(datetime.now()),
time_out=2000)
except Exception as e:
print("Error occurred:", e)
finally:
try:
publisher.terminate()
messaging_service.disconnect()
except Exception as e1:
print("Error occurred closing:", e1)