Random "failed to retrieve" error in streaming scenario

Tuuliyo
Tuuliyo Member Posts: 3

Hello together,

I currently working on a lokal setup where solace software acts as the message broker in docker setup. I try to receive events from a existing queue, which is not a problem in general, but in my case I emit them in a flink job (wirtten with pyflink). I receive around 1000-3000 events/sec from this queue. I can receive around 150k events and randomly i get the following error and my setup stops working (see below).

The error varies from "Failed to retrieved" to "payload of message cannot be read" or "message id not present" all this results that my flink job stops working.

I want to mentioned that the data that is inserted in solace broker is generated by myself. Those messages are 100% bulletproof and have a message body etc. I also tried to implement some message acknowledgement in general but this doesnt work either.

Any hint in any way I should look for is really appreciated! 😊 As for now I am kinda lost about any solution, because the retrieving works for so many messages, that are generated by my application and validated against a schema etc.

Would like to hear from you guys!

If something is missing just write me :-)


Error Message:
13:11:25,779 [WARNING] solace.messaging.core: [_message.py:497] Failed to retrieve2024-11-23 14:11:25 Caller Description: Message->process_rest_data. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in _solClient_msgHeaderMap_getString]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]2024-11-23 14:11:25 2024-11-23 13:11:25,780 [ERROR] solace.messaging.receiver: [_persistent_message_receiver.py:348] [[SERVICE: 0xffff80e3beb0] [RECEIVER: 0xffff81f88310]] Failed to retrieve2024-11-23 14:11:25 Caller Description: Message->process_rest_data. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in _solClient_msgHeaderMap_getString]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]2024-11-23 14:11:25 Exception ignored on calling ctypes callback function: <bound method _PersistentMessageReceiver._flow_message_receive_callback_routine of <solace.messaging.receiver._impl._persistent_message_receiver._PersistentMessageReceiver object at 0xffff81f88310>>2024-11-23 14:11:25 Traceback (most recent call last):2024-11-23 14:11:25 File "/usr/local/lib/python3.10/dist-packages/solace/messaging/receiver/_impl/_persistent_message_receiver.py", line 349, in _flow_message_receive_callback_routine2024-11-23 14:11:25 raise PubSubPlusClientError(message=exception) from exception2024-11-23 13:11:25,781 [WARNING] solace.messaging.core.api: [_inbound_message_utility.py:192] Unable to get message id2024-11-23 14:11:25 Caller Description: _InboundMessage->get_message_id. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in solClient_msg_getMsgId]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]2024-11-23 14:11:25 Error during message settlement: Unable to get message id2024-11-23 14:11:25 Caller Description: _InboundMessage->get_message_id. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in solClient_msg_getMsgId]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]

Custom MessageHandlerImpl Class:
class MessageHandlerImpl(MessageHandler): def __init__(self, flink_source, persistent_receiver: PersistentMessageReceiver): self.flink_source = flink_source self.receiver: PersistentMessageReceiver = persistent_receiver def on_message(self, message: InboundMessage): try: payload = message.get_payload_as_string() if message.get_payload_as_string() is not None else message.get_payload_as_bytes() if isinstance(payload, bytearray): print(f"Received a message of type: {type(payload)}. Decoding to string") payload = payload.decode() topic = message.get_destination_name() print(f"Received message on: {topic}") #print(f"Message payload test: {payload}") try: # Attempt to emit the payload to Flink self.flink_source.emit(payload) except Exception as e: print(f"Error emitting message to Flink: {e}") # Nack the message so it gets sent to the DLQ self.receiver.ack(message) return # If everything succeeded, acknowledge the message self.receiver.ack(message) except PubSubPlusClientError as e: print(f"Error during message settlement: {e}") except Exception as e: print(f"Unexpected error during settlement: {e}")

Comments

  • Ragnar
    Ragnar Member, Employee Posts: 67 Solace Employee

    Hi Jonas,

    I cannot determine the source of the problem from the snippets here. The cut and paste appears to have removed a lot of whitespace that would make the code and logs more legible. However if I separate the debug log by timestamps I see this:

    13:11:25,779 [WARNING] solace.messaging.core: [_message.py:497] Failed to retrieve
    2024-11-23 14:11:25 Caller Description: Message->process_rest_data. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in _solClient_msgHeaderMap_getString]. Sub cod
    e: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]
    2024-11-23 14:11:25
    2024-11-23 13:11:25,780 [ERROR] solace.messaging.receiver: [_persistent_message_receiver.py:348] [[SERVICE: 0xffff80e3beb0] [RECEIVER: 0xffff81f88310]] Failed to retrieve
    2024-11-23 14:11:25 Caller Description: Message->process_rest_data. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in _solClient_msgHeaderMap_getString]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]
    2024-11-23 14:11:25 Exception ignored on calling ctypes callback function: <bound method _PersistentMessageReceiver._flow_message_receive_callback_routine of <solace.messaging.receiver._impl._persistent_message_receiver._PersistentMessageReceiver object at 0xffff81f88310>>
    2024-11-23 14:11:25 Traceback (most recent call last):
    2024-11-23 14:11:25 File "/usr/local/lib/python3.10/dist-packages/solace/messaging/receiver/_impl/_persistent_message_receiver.py", line 349, in _flow_message_receive_callback_routine
    2024-11-23 14:11:25 raise PubSubPlusClientError(message=exception) from exception
    2024-11-23 13:11:25,781 [WARNING] solace.messaging.core.api: [_inbound_message_utility.py:192] Unable to get message id
    2024-11-23 14:11:25 Caller Description: _InboundMessage->get_message_id. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in solClient_msg_getMsgId]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]
    2024-11-23 14:11:25 Error during message settlement: Unable to get message id
    2024-11-23 14:11:25 Caller Description: _InboundMessage->get_message_id. Error Info Sub code: [2]. Error: [Bad msg_p pointer '(nil)' in solClient_msg_getMsgId]. Sub code: [SOLCLIENT_SUBCODE_PARAM_NULL_PTR]. Return code: [Fail]

    There appear to be duplicate entries, some lines truncated or interrupted, plus some entries off by exactly one hour (timezone setting?) which makes me think there is possibly some multi-processing access occurring. The Solace PubSub+ Messaging API for Python does not support multi-processing access to the same messaging service.

    To get to the true source though we will need to understand your threading model, use of executors, and any other parallelism techniques as this appears on the surface to be some sort of concurrency issue.

    Regards

    Ragnar

  • Tuuliyo
    Tuuliyo Member Posts: 3
    edited November 25 #3

    Hello Ragnar,

    thank you for the quick response.
    Let me try to wrap up the architecture behind my project. The general exercise is to create a real time backend with stream processing to aggregate any kind of data.
    My data generator is a self designed pos application which sends out transaction data, this goes through a traefik loadbalancer to validation api, to check the schema and do some corrections on data (ex. track if item price sum up to total price). The validation service just let through the validated data and this is send to a solace topic. Behind that topic is a queue. From that queue I receive the data in a solace consumer and emit the data to my apache flink job (which doesnt work atm, also tried it with bytewax).

    self designed pos application: scaled to 5 replicas in docker compose setup

    loadbalancer traefik: just one

    validation service: scaled to 5 replicas in docker compose setup

    solace consumer and therefore the flink job: just one, task manager has parallelism to 4 and 2 replicas with 2 numOfTask each.

    All services are written in python, for flink I use pyflink as the bridge to the underlying java components.

    The Solace Producer and Solace Consumer are also created in python (SMF) and had the Solace HowTo GitHub as reference.
    As for now the complete pipeline is just passing through data, because the flink job has some interruptions and do nothing. I tried to check on this, and it could be that the resulting errors I received are tracked back to the flink job, that is interrupting and not receiving the messages in the right way (No good ACK work by me). That maybe it could result in a scenario where the flink job got duplicates in some way and due to error throwback the broker acts like you see in my errors. (Just a guess, but maybe it helps)

    Hope I could bring some light in this,

    if you need more - let me now

    BR Jonas

  • Ragnar
    Ragnar Member, Employee Posts: 67 Solace Employee

    Hi,

    Thanks for all the details, that's very useful. We need to focus on the consumer as that is where the error is occurring, all the rest of your description looks good, and the messages that are published and queued are independent from the consumer.

    solace consumer and therefore the flink job: just one, task manager has parallelism to 4 and 2 replicas with 2 numOfTask each.

    I'm not sure what this implies. What is the task manager and how does it implement concurrency/parallelism?

    Meanwhile, another thing you can do to get more error logging in your consumer is set to the environment variable TRACE_SOLACE_MSG_LOG to warn. This will turn on additional logging around message instantiation and destroy (initializer/finalizer) which might shed some light on how the native message pointer because 'nil' immediately within the initializer.

    Ragnar

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 644 admin
    edited November 27 #5

    Hi team! Pro-tip: do 3 backticks to engage "code block" mode. You're both just using "single line code" mode. And make sure you do it after an [ENTER], not a Shift+[ENTER] like I often do.

    @Tuuliyo you can click the three dots in the top-right of your post and edit it.

    I'm wondering: since this error only seems to occur once you've received a LOT of messages from the queue, are you free-ing the messages once you're done with them? Is there any chance you're running out of memory? I guess if it's null pointer exceptions, that's probably not what's happening.

  • Tuuliyo
    Tuuliyo Member Posts: 3

    Hi @Aaron, thank you for the tip, was wondering why this doesnt work and just used the quick shortcut in the comment field. But as for now, I cannot edit the post anymore, I just can bookmark him.

    I thought I was free-ing them, but you are right! My first implementation was with flink, but with the python libs. Turn out it is quite hard to create a custom source for flink. What ever… after the emit to the flink job i send ack back to broker. But i figured out my problem now. Firstly I switched from flink to bytewax. Here I refactored the Solace Consumer implementation in the connectors api from bytewax. Avoiding thread problems (solace consumer was kinda blocking it, and sometimes the flink job wasn't even submitted). I also avoided any queue or arrays to prevent oom errors in general. With the switch to the connectors api from bytewax iam able to push the data in the pipeline and ack the message instantly prevending any error and the above errors: I do not get them.

    To wrapp it up: I guess it was a combination of oom problems with the processing blocked by the on_message as callback method. After code refactoring to bytewaxs connector and directly make use of the messages and not to bridge them in a thread save queue etc. I was able to get this running!

    Thank you for the support. Time for me to get otel working with the broker 😉