How to monitor Solace as an external consumer

cl0udf0x
cl0udf0x Member Posts: 6

Hi,

I have inherited an application that consumes messages from an external Solace pub+sub system and fairly new to it.

What would be the best way to monitor from a consumer perspective?

Having been thinking of the following

  • check the auth
  • check the connection to queue
  • check the queue (is it possible to measure things like queue size etc?)

I hope to use an API with Python requests or use Python Client.

I appreciate any help you can provide.

Comments

  • cl0udf0x
    cl0udf0x Member Posts: 6
    edited January 10 #2

    I hope this helps explain what I'd like to achieve.

    Keen to get some feedback etc, thanks in advance.

    I've setup a local docker subpub+ using the starter guide and running the code below to test out some queries.

    Note the following code is very rough and just for initial learning:

    ## Check the message VPN and Auth in the process
    def check_message_vpn(): apiUrl = f"http://{primaryBroker}:8080/SEMP/v2/__private_monitor__/msgVpns/{messageVpnName}?select=state" response = requests.get(apiUrl, auth=(apiUsername, apiPassword), verify=False) print(response.json()) try: response = requests.get(apiUrl, auth=(apiUsername, apiPassword), verify=False) if response.status_code == 200: vpn_state = response.json().get("data", {}).get("state", "unknown") if vpn_state == "up": print(f"Message VPN '{messageVpnName}' is up and responding correctly.") else: print( f"Message VPN '{messageVpnName}' is not up. Current state: {vpn_state}" ) elif response.status_code == 401: print("Authentication failed. Please check your username and password.") else: print( f"Failed to fetch the state of the message VPN. Status code: {response.status_code}" ) print("Response content:", response.text) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}")

    ## Check the queue
    def check_queue_health(): apiUrl = f"http://{primaryBroker}:8080/SEMP/v2/config/msgVpns/{messageVpnName}/queues/{queueName}" try: response = requests.get(apiUrl, auth=(apiUsername, apiPassword), verify=False) if response.status_code == 200: queue_info = response.json().get("data", {}) ingress_enabled = queue_info.get("ingressEnabled", "unknown") egress_enabled = queue_info.get("egressEnabled", "unknown") if ingress_enabled and egress_enabled: print(f"Queue '{queueName}' is up and processing messages.") else: print( f"Queue '{queueName}' is not processing messages correctly. Ingress: {ingress_enabled}, Egress: {egress_enabled}" ) elif response.status_code == 401: print("Authentication failed. Please check your username and password.") else: print( f"Failed to fetch the state of the queue. Status code: {response.status_code}" ) print("Response content:", response.text) except requests.exceptions.RequestException as e: print(f"An error occurred: {e}")

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 668 admin

    Hey @cl0udf0x ..! Good to see you trying to use some SEMP, that's definitely the way to monitor the broker. Although, I woudn't use anything with __private_monitor__ in the URL. It's some internal API call and might change/break in the future. Looking at that line, I think you could probably just remove that part of the URL and the call should still work… just querying VPN info. More info here: https://docs.solace.com/Admin/SEMP/Using-SEMP.htm

    So: you have a consumer app (in Python), and you want to monitor this app for its "health"… performance, connection, whatever. Are you looking for the app to self-monitor? Or write another application (in Python?) that also does SEMP to the broker to get your queue status? Or have the consumer app itself doing the SEMP? If the latter, I wouldn't recommend this… usually monitoring applications are kept separate from data applications. There are a couple cases where you might want to do this, but I always prefer to keep things separate since they're separate things.

    It's possible to compile libraries from our SEMPv2 API, so that you can do things programmatically rather than having to parse through the raw JSON yourself. Might help?

    As for what to monitor: the majority of the monitoring would probably be done on the broker side. You say the broker is "external" as in you don't control it? But you also have SEMP access to it?

    For your app to self monitor, there are a number of event handlers you can register with the API: e.g. session event handler, will give you connection UP/DOWN events; flow event handler will tell you if your flow (bind to the queue) is UP/DOWN (connected) and hopefully ACTIVE (your app is live for receiving messages)… a 2nd instance of your app binding to an exclusive queue would not get the ACTIVE notification until the 1st instance unbinds and the 2nd one takes over. Obviously, within your app, you can track your own metrics by incrementing a msgRecvd int every time you get a message, and a msgProcd variable whenever your consumer app has successfully processed and ACKed a message.

    For monitoring the broker: you'd want to primarily watch the queue depth (am I falling behind), and maybe also if there's messages in the queue to deliver, but either the message send rate is 0 or the lowest message ID is not increasing (a message stuck at the front). This would/could indicate a problem with your consumer app. Oh, or the bind count is 0, which means your app isn't connected.

    Have you run into any issues already/previously that you're hoping to avoid with improved visibility?

  • cl0udf0x
    cl0udf0x Member Posts: 6

    Thanks so much for this detailed response. It's appreciated!

    I'll digest this and respond shortly. Thanks again!

  • cl0udf0x
    cl0udf0x Member Posts: 6

    Hi @Aaron

    Yes, the application has several Python consumers connecting to different queues. Because of its critical nature, I'd like to gather metrics to decide which are useful/feasible for alerting and possibly dashboard all of them.

    We haven't run into any specific issues, but there have been some failed reconnections and upstream networking issues(this could cover a multitude of sins as we don't have visibility). Having the metrics you have put forward will be a win for us. Publishing a dummy message at the source and monitoring its successful arrival to confirm all parts of the system are working would be the icing on the cake.

    I tested removing `__private_monitor__` when using the API against the prod environment and it failed. I'll test on the local docker instance and let you know.

    Yes, good points for monitoring the broker
    - queue depth
    - message send rate
    - message ack rate
    - bind count

    I understand the approach of not mixing monitoring with the application, this is a valid point! You mentioned only one instance can bind at a time so when capturing the session event handler, flow event handler , and active event handler the application itself must do this. I searched the docs but didn't find examples or refs can you send links if you have them?

    Thank you again! :)

  • cl0udf0x
    cl0udf0x Member Posts: 6

    I've tested out a way to capture service event metrics.

    Added the service_interupted boolean to the service event handler class:

    class SubServiceEventHandler(
        ReconnectionListener, ReconnectionAttemptListener, ServiceInterruptionListener
    ):
        def __init__(self):
            self.service_interrupted = False
    
        def on_reconnected(self, e: ServiceEvent):
            …
            …
            self.service_interrupted = False
    
        def on_reconnecting(self, e: ServiceEvent):
            …
            …
    self.service_interrupted = True def on_service_interrupted(self, e: ServiceEvent): …
    … self.service_interrupted = True

    Add a function to check the value of the service_interupted variable
    A thread to check the status every minute.

    receiver.start()
    
    # Set the message handler
    message_handler = MyMessageHandler()
    receiver.receive_async(message_handler)
    
    # Function to check the service status every minute
    def check_service_status():
        while True:
            if service_event_handler.service_interrupted:
                print("Service is interrupted")
            else:
                print("Service is running normally")
            time.sleep(60)
    
    
    # Start a thread to check the service status every minute
    status_thread = threading.Thread(target=check_service_status)
    status_thread.daemon = True
    status_thread.start()
    

    To test this the Solace docker container has come in handy!

    When working:

    Publisher
    Sent 'Hello, Solace! aANdP' to queue 'foo' Message published Sent 'Hello, Solace! DgNsT' to queue 'foo' Message published Subscriber Received message: Hello, Solace! aANdP on queue foo Service is running normally Received message: Hello, Solace! DgNsT on queue foo Service is running normally


    When the docker container is stopped/started:

    Subscriber

    Received message: Hello, Solace! eHWRN on queue foo 2025-01-22 15:50:29,057 [WARNING] solace.messaging.core: [_solace_transport.py:89] [[SERVICE: 0x7f9c8b49ba60] - [APP ID: hq/656553/00000001/geywCErEZj]] {'caller_description': 'From service event callback', 'return_code': 'Ok', 'sub_code': 'SOLCLIENT_SUBCODE_COMMUNICATION_ERROR', 'error_info_sub_code': 14, 'error_info_contents': 'Peer closed socket, fd 8, cannot read'} on_reconnecting Attempting to reconnect. Error cause: {'caller_description': 'From service event callback', 'return_code': 'Ok', 'sub_code': 'SOLCLIENT_SUBCODE_COMMUNICATION_ERROR', 'error_info_sub_code': 14, 'error_info_contents': 'Peer closed socket, fd 8, cannot read'} Message: Peer closed socket, fd 8, cannot read Service is interrupted Service is interrupted Service is interrupted 2025-01-22 15:53:09,865 [WARNING] solace.messaging.core: [_solace_transport.py:89] [[SERVICE: 0x7f9c8b49ba60] - [APP ID: hq/656553/00000001/geywCErEZj]] {'caller_description': 'From service event callback', 'return_code': 'Ok', 'sub_code': 'SOLCLIENT_SUBCODE_COMMUNICATION_ERROR', 'error_info_sub_code': 14, 'error_info_contents': 'Peer closed socket, fd 8, cannot read'} on_reconnected Error cause: {'caller_description': 'From service event callback', 'return_code': 'Ok', 'sub_code': 'SOLCLIENT_SUBCODE_COMMUNICATION_ERROR', 'error_info_sub_code': 14, 'error_info_contents': 'Peer closed socket, fd 8, cannot read'} Message: host 'tcp://localhost:55555', hostname 'localhost:55555' IP [::1]:55555 (host 1 of 1) (host connection attempt 1 of 1) (total reconnection attempt 1 of -1) Service is running normally Service is running normally Received message: Hello, Solace! HIAQw on queue foo


    While this has been fun to play around with it perhaps doesn't add anything that monitoring the bind count at the broker level?

    Still can't find the docs on the session/flow/active 🔍️

    Cheers