Applying backpressure to a flow by blocking in the MessageEventHandler

allmhhuran
allmhhuran Member Posts: 47 ✭✭✭
edited September 2021 in PubSub+ Event Broker #1

I have a few days of free time and so I am developing a new branch of my Solace subscriber wrapper library. I thought I'd play with exposing the message stream as an IAsyncEnumerable.

I decided to use a System.Threading.Channels.Channel with a bounded capacity for this. The idea is to (usually) quickly let the message event handler post to the channel even if the subscriber processing is occasionally a bit tardy. However, there may be times when bursty message rates cause the channel buffer to fill up. In those cases I want to apply back pressure to the solace components.

One way I could do this would be to actually disconnect the flow when the local buffer is full and reconnect once it has drained, but this seems like an expensive proposition that would require a lot of additional code. As an alternative, I was wondering what would happen if the message event handler simply blocked. To be clear, this would be an "atypical" situation, the idea would not be to block except in cases where the subscriber's message processing is itself slowed down unexpectedly.

In other words, what if the message event handler is:

// buffer is the Channel
(_, args) => buffer.Writer.WriteAsync(args, ct).AsTask().Wait()

Feeding the Channel in this way would cause this function to hang if the channel buffer is full.

Will this in fact apply back pressure through the solace components, or does the library spin off invocation to the threadpool (in which case this could result in threadpool exhaustion in extreme cases)?

I believe I could also partly manage this via the guaranteed message window size and max delivered unacked messages, but If I want to allow subscribers to have some control over their own buffers without exposing too much of the solace internal machinery, this question still arises.

As a corollary, I'm curious about the AckTimerInMsecs flow property. Is this the maximum amount of time the broker will wait for acknowledgement of a message before deciding it needs to be redelivered? Buecase 1000 milliseconds is awfully short if I want to be able to do any kind of processing before acknowledgement.

Tagged:

Best Answer

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 667 admin
    edited September 2021 #2 Answer ✓

    Hi @allmhhuran ..! How's it going?

    Ok, first off... don't disconnect the Flow! This will cause all unacknowledged messages to be put back onto the queue and made available for redelivery... not what you want! I'm very much hoping/assuming that your subscriber wrapper tooling allows the received messages to be ACKed later after being handed off and processed.

    You also can't block in the receiver callback... the C & .NET APIs only have one internal Context thread... if you block that up, a bunch of bad things can happen (timers stop firing, keep-alive messages won't get sent, etc.) and you'll be unhappy. Block long enough, your API will disconnect when you unblock due to keep-alive failures.

    What you want to do is call Stop() and Start() on your IFLow object. https://docs.solace.com/API-Developer-Online-Ref-Documentation/net/html/391b4a92-ec54-992a-d79e-d907cf57f71e.htm that just "pauses" the messages coming on that specific flow. When your internal channel drains past 80% (or whatever) then you can call Start() to get it going again.

    Secondly, on your questions around ACK timers and such... those are for transport ACKs, not for the explicit application ACKs. It's kind of how it's explained in this blog, but on the consumer side. https://solace.com/blog/understanding-guaranteed-message-publish-window-sizes-and-acknowledgement/

    The AD receive window size, and ACK thresholds and ACK timer are just related to transport/flow from the broker to the API. Nothing to do with your application processing logic. You can slow down the receipt of messages by tuning the window size down, but that's only if you really want to. As Tom mentioned, best to keep things at defaults unless you have a good reason.

    Finally, NOTE: not sure how big you are configuring your internal Channel object size, but the broker will only sent you max-unacked-per-flow messages (a per-queue setting) to each connect consumer before it stops and waits for you to start ACKing some. So one easy way to also control messages into your app, if you Channel size is (say) 100 messages big, then you could set max-unacked-per-flow to 100 - AD_REC_WIN_SIZE. (or something close to that).

    Hope that helps!

Answers

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    Hi @allmhhuran,
    AckTimerInMsecs relates to how often acknowledgements are sent from the API. We have a windowed ack mechanism. Imagine you set that window to 2 messages before you send an ack. You get one message.

    You need to define a timer that says "although the ack window isn't full, I've waited long enough so I'll send whatever acks I have now." That's what AckTimerInMsecs is.

    The broker will only re-send a message if the flow is disconnected without an ack for that message.

    I'll get back to you on the back pressure question.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    Ah ok, so if I understand you correctly that's the setting for the buffered-ack timer inside the solace library code (that actually acks back to the VPN), and not related to the time between a client receiving a message and calling IFlow.Ack(). Probably a good one to leave as default I expect.

  • TomF
    TomF Member, Employee Posts: 412 Solace Employee

    Yes, exactly. Best left at the default unless you have an explicit reason to change it.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    For technical completeness, as a slight improvement to the above sample code (given that posting to the channel is expected to work most of the time), I decided to modify the implementation as below, but the question itself is unchanged:

    messageEventHandler: (_, args) =>
    {
       // deserialization to a known msg type omitted
       if (!buffer.Writer.TryWrite(msg))
       {
          buffer.Writer.WriteAsync(msg).AsTask().Wait();
       }
    }
    
  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 667 admin
    edited September 2021 #7 Answer ✓

    Hi @allmhhuran ..! How's it going?

    Ok, first off... don't disconnect the Flow! This will cause all unacknowledged messages to be put back onto the queue and made available for redelivery... not what you want! I'm very much hoping/assuming that your subscriber wrapper tooling allows the received messages to be ACKed later after being handed off and processed.

    You also can't block in the receiver callback... the C & .NET APIs only have one internal Context thread... if you block that up, a bunch of bad things can happen (timers stop firing, keep-alive messages won't get sent, etc.) and you'll be unhappy. Block long enough, your API will disconnect when you unblock due to keep-alive failures.

    What you want to do is call Stop() and Start() on your IFLow object. https://docs.solace.com/API-Developer-Online-Ref-Documentation/net/html/391b4a92-ec54-992a-d79e-d907cf57f71e.htm that just "pauses" the messages coming on that specific flow. When your internal channel drains past 80% (or whatever) then you can call Start() to get it going again.

    Secondly, on your questions around ACK timers and such... those are for transport ACKs, not for the explicit application ACKs. It's kind of how it's explained in this blog, but on the consumer side. https://solace.com/blog/understanding-guaranteed-message-publish-window-sizes-and-acknowledgement/

    The AD receive window size, and ACK thresholds and ACK timer are just related to transport/flow from the broker to the API. Nothing to do with your application processing logic. You can slow down the receipt of messages by tuning the window size down, but that's only if you really want to. As Tom mentioned, best to keep things at defaults unless you have a good reason.

    Finally, NOTE: not sure how big you are configuring your internal Channel object size, but the broker will only sent you max-unacked-per-flow messages (a per-queue setting) to each connect consumer before it stops and waits for you to start ACKing some. So one easy way to also control messages into your app, if you Channel size is (say) 100 messages big, then you could set max-unacked-per-flow to 100 - AD_REC_WIN_SIZE. (or something close to that).

    Hope that helps!

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited September 2021 #8

    Thanks Aaron, great info.

    Yeah, the subscriber wrapper exposes the broker as a pair of custom interfaces, IProducer (of messages) and an IConsumer (of acknowledgements), with the idea being that the devs just wire up a processing pipeline between those two endpoints... probably using a weak-and-linear-but-hey-its-easy-to-use fluent wrapper I wrote for System.Threading.Tasks.Dataflow. If they want branching they can write their own damn wrapper :D

    I do have a convenient point to call stop (basically, just after the try_write in my example code above), detecting that it is safe to restart is not quite as easy, but not very very difficult.

    On the buffer, yeah, I have the default size configured to be max unacked + receive window size, because that just makes everything easy, no need to stop or start anything or worry about backpressure since the VPN will "self-backpressure" while waiting for acks. Right now I do expose the ability for a subscriber to change that buffer size in the constructor of the wrapper implementation, but I think I might just remove that ability to make my life easier :wink: Then I don't ever have to worry about manipulating the flow.

    Thanks again for going deep on the implementation details! Always a pleasure.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 667 admin

    Sounds like you're building your own Spring Cloud Streams framework, but for .NET..!

    Restarting the Flow could be checked every time a receiver/consumer pulls a message out of your Channel... as soon as it's below some threshold, then restart.

    Regarding buffer size, resizing, etc... I don't think there's any way to programmatically get the max-unacked-per-flow setting from the queue that you're connecting to (except via SEMP, but that's not part of the messaging APIs). So because you might be changing that setting at the queue, perhaps make it too big, you'll still have to have the start/stop logic in your app. Unless you are standardizing on a specific setting for all your queues that allow you to bake-in that setting into your wrapper. So allowing the apps to resize that buffer maybe isn't a bad thing..?

    Anyhow, happy to help! Even if I am a bit slow at responding..! 😅

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited September 2021 #10

    Haha, that description is probably good for any internal marketing hype I want to create, but the implementation is rather more modest XD

    I was thinking that the best place to restart the flow (if I go with that model at all) would be inside the acknowledgement wrapper. My reasoning here is that from the client's point of view, they don't really care about messages in flight on the network or the internal constraints of the system. What they care about when it comes to buffering is the amount of memory they're using. Being able to specify a known maximum capacity for their processing pipeline "as a whole" is all they're really going to care about, and since messages effectively leave their memory space at acknowledgement time (with the small caveat of some timed buffering of acknowledgement inside the solace library itself, but that's just a set of long for the ADMessageId right? So, pretty small), that's the specification they might want to provide. The slight pain here is more about the inability to read the current count inside a Channel, since it's more like an IEnumerable than an IList, but I could just switch to a ConcurrentQueue, or Interlocked.Increment / Interlocked.Decrement on a private field.

    The "subscriber" object is a per-topic entity in my model, and encapsulates a flow. That is to say, the flow is instantiated at construction time, so I do have the option of setting the max unacked messages in the ISession.CreateFlow call in the subscription constructor, so that works - if I decide not to just hardcode it as you mentioned. Perhaps there is still some benefit it having both controls in the implementation - ie, stopping and starting flows as well as letting the unacked message count provide flow control.

    Edit:

    Thinking about that a little more, there's an off-by-one error in my thoughts there, because if the flow is stopped only when the buffer refuses a message, then that message is effectively lost to the ether. So the flow has to be stopped at the message that fills the buffer, not at the message that would overflow the buffer. But in that case the flow could be "bouncing off the limiter" a lot, which would be inefficient. So it does seem sensible to make the buffer size one larger than max unacked plus ad window size. But then it follows that stopping the flow isn't really needed, since the flow would already be throttled by the broker. So explicit flow control only seems necessary if the buffer is too small, and since I have control over all of the parameters, that shouldn't be a problem.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 667 admin

    When you Stop() the IFlow, there is a chance to get up-to-a-full-AD-Window's worth of messages arriving after stopping... because nothing in this world is instantaneous, there is a delay between calling Stop(), which makes the API send a control message to the broker, at which point it will stop sending any further messages.

    And so yeah, you definitely don't want to be "bouncing off the limiter", 1 message at a time stop/start. In the JCSMP API, there is a similar internal queue between 2 threads: one reading from the socket, one dispatching to the application. When that queue hits 5000 messages, stop() is called on the consumer object (the connection) until the queue drops below 2000 (? I think?) messages. So quite a big range. Don't make your stop/start logic too tight.

    And if you trust your max-unacked-per-flow settings on all your queues, that's totally fine to use as the primary "limiter", means you don't keep having to go stop/start/stop/start with the broker. But is still important to have those controls in your app in case someone messes up a queue setting and you don't want to blow up the memory in your app.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited September 2021 #12

    Hehe, I actually ended up with almost exactly the same logic a few days ago. Here, _carrier is a thin wrapper around bits of the IFlow. The MessageEventHandler forwards to:

    _carrier.ItemAvailable += (_, message) =>
    {
       if (!_buffer.Writer.TryWrite(message)) throw // ...
       if (_buffer.Reader.Count > maxUnackedMessageCount) // > rather than >= since the flow will typically have taken over anyway
       {
          lock (_carrier)
          {
             if (!_paused)
             {
                _carrier.Stop();
                _paused = true;
                _logger?.LogWarning("Backpressure on {subscription}", this);
             }
          }
       }
    };
    

    Then on the other side, the IFlow.Ack is wrapped like this:

    public async Task AcknowledgeAsync(T payload, CancellationToken ct)
    {
       if (_buffer.Reader.Count < _resumeAtCount)
       {
          lock (_carrier)
          {
             if (_paused)
             {
                _carrier.Start();
                _paused = false;
             }
          }
       }
       await _ackRetryPolicy.RetryForever(() => _carrier.TryAck(payload), ct);
    }
    

    There is some inefficiency here, in that full locks are sometimes taken unnecessarily. However, that only happens either when we actually need to stop the flow anyway or the flow is already stopped (in the message event handler), or when there no real penalty because the message rate is low, since I have resumeAtCount set to 10% of whatever the max unacked count is. Enough to reduce periodic latencies, but not so much that the condition is regularly hit except when there is no performance issue anyway.

    I also set the buffer size to double the max unacked count to be completely safe even if messages dribble in after the flow is stopped, and of course I actually do set the max unacked on the flow, so typically the flow control is really being done by the Solace IFlow anyway (hence the name, I suppose!)

    There can theoretically be a race for the _buffer.Reader.Count, and as you mentioned keeping the max and min threshold apart reduces the likeliness. But the way I figure, even if things happen out of the expected order, it can never result in a long term problem since the buffer count eventually becomes stable no matter what.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    Oh, one final comment on that code... If I understand correctly, stopping the IFlow doesn't just stop the message events from being raised, it also prevents acknowledgements from going through. I... actually didn't look into that thoroughly, there was just one test execution where I had the code a particular way and it unexpectedly sat around doing nothing. I reasoned that this was why.

    That being the case, the ack wrapper has to restart the flow before sending off the ack. It would be nice if Stopped flows still processed acks, but no longer raised events. The the ack retry could be done before the flow restart in the code above, which is strictly more correct.

    Or maybe I'm wrong about that, and the acks do still propagate even if the flow is stopped, and something else was the source of the problem. I have fiddled with the code so much since then that I doubt I could find it again in the commit history.