Receive the latest message only for both direct message and persistent message - python
Hi all, recently I am engaging a project that require data streaming. Therefore, even though there is a latency in processing the message, I would like the consumer always take the latest message.
I am a python developer and would be grateful if someone can tell be how to achieve the above requirement for both direct message and persistent message cases. Thanks a lot!
Answers
-
Hi @BrianChan . This is not something easily achievable normally. Simply because in a distributed system, it takes time for the message to get from the broker to the application. Inside the broker there is a outbound buffer for your client, then there is a TCP socket buffer, then there is the data on the wire, then another TCP socket buffer in your application computer, then into the API and dispatched to the application. When you mean "latest message", is that the very latest at the broker? How could that be achieved? And would this be per-topic? What if you're receiving a whole bunch of different topics interleaved... there's no way to know until your application receives it which is the "latest one".
Backing up a bit... how long does it take to process a message? Are you finding there are messages "piling up" somewhere? There are some features and tunable knobs in Solace that can force the broker to automatically discard messages if a consumer is too slow... but this essentially means you're losing messages. And again there's no per-topic control of this... it's all one big pipe. However, if you'd only like to receive, say, one message per second, and the stream you're subscribing to is 100 msgs/s, then there is a feature called Eliding which might help. But it's tricky sometimes to use, and I'd recommend if we could explore other ways to achieve what you want, or avoid what you don't want.
How many topics are you subscribing to (not subscriptions, if using wildcards), but how many unique message topics would you be receiving? What are your rates? Can you parallelize? Is order important? Is loss ok? Is your publisher inserting sequence numbers so you can detect gaps? etc. etc.
Looking forward to hearing more..!
0 -
Hi @Aaron, thank you for the reply. In details, there is only one topics and losing message is not a problem as processing outdated message is not meaningful in my case. The main goal is to keep processing the latest message.
Actually, I went through the Solace Message Broker documentations, I also came across the term "Eliding" and this should be what I am looking for. However, I don't know how to implement this in my python program.
Besides, for persistent message, there is a term called "Last Value Queue" which I think maybe relevant, but I am not familiar with this as well.
Looking forward to hearing from your team. Thank you very much.
0 -
Right, ok. So Eliding is useful in Direct messaging applications where you only want to receive messages at a certain rate... similar to my comment above, if the publisher is pushing out 100 msgs/s, and you only want to receive 1 message per second, the broker will drop messages for you. It does this using a "window", whose length is determined by the eliding delay. The eliding delay is part of your consumer's
client-profile
. Note that the messages will also have be published with a particular flag to tell the broker that the messages are "eliding eligible". This is exposed in the Python API:solace.messaging.config.solace_properties.message_properties.ELIDING_ELIGIBLE
. You could give all that a try.Otherwise, yes for Guaranteed messages, you could use a LVQ, a queue with a
max-spool-usage
of 0MB, which tells the broker that it can only hold on to one message, the last one that it currently receives. For your use case, this might sound more appropriate. Added benefit of LVQ is that since it's Guaranteed messages, if the broker fails over, or your client disconnects/reconnects, it can still get the last message that was received. So just subscribe your LVQ to your topic(s) and try it out.0 -
Hi @Aaron, actually I saw the same code above for Eliding in the python documentation. However, I don't know how to apply in my python program. Let's take the example in GitHub, under the pattern folder, there is a direct_subscriber.py. May I ask how should I modify the script so that it can perform Eliding?
0 -
For the LVQ, just set the "Messages Queued Quota (MB)" to 0. Easy.
For Eliding, there needs to be some broker-side changes done first, in your client profile. On the PubSub+ Manager, go into "Access Control", "Client Profile":
Enable it and set whatever delay you want. Note that this is for the Subscriber. You might consider making a 2nd client-profile specifically for eliding, and then make a new client-username that uses that profile instead.
Then in your Python app, you have to add that ELIDING_ELIGIBLE flag when you're building your message. I think, not sure. I'm not a Python guy. But I think that's how you do it.
0 -
Hi @Aaron, sorry to bother you. I tried the LVQ and I set the "Messages Queued Quota (MB)" to 0. However, when running the publisher and the subscriber at the same time, I found that the subscriber still consuming message by message. Only when I kill the subscriber and restart it, it consume the latest message which is no so practical. There are some screenshot describing the issue.
0 -
@Aaron Besides, for add that ELIDING_ELIGIBLE flag to the message, is that mean I have to set this in my publisher side program? As the flag is a property of the message and it should be set before publishing, right?
Should I set this when preparing the "outbound_msg" according to the 'direct_publisher.py' example? May I ask is it possible to pass my query to your teammate who is familiar with python?
Thank you very much.
0 -
@Tamimi can you help with the specific Python question about setting the "eliding eligible" flag on published messages?
@BrianChan is your subscriber able to keep up with the full rate of published messages? If you crank up your publish rate, or deliberately slow down the subscriber, you'll see that it's only receiving the latest message.
OH, just thought of something... it might be "pre-caching" on the subscriber API. So, try this, see if it changes: in the queue's configuration, you'll find a setting for "Maximum Delivered Unacknowledged Messages per Flow". Set that to 1. That will stop the API from pre-fetching any additional messages.
0