Any tricks to speed up Commit() using an ITransactedSession in C#?

allmhhuran
allmhhuran Member Posts: 47 ✭✭✭
edited March 2024 in PubSub+ Event Broker #1

This question follows on from this discussion :

The objective here is to…

  • publish guaranteed messages, and
  • be certain that the broker receives them in the order in which they were generated so that re-ordering does not have to be done on the subscriber side (for reasons summarised in the linked thread), and
  • Avoid introducing logic that may unnecessarily cause duplicate publication (ie, try to achieve exactly-once delivery insofar as it is actually possible to do that).

To that end I have decided to use a transaction as a way of sending and then committing batches of messages, with retry performed at the level of the whole batch. In simplified pseudocode…

while (messages to publish and batch is not full) add message to batch
do
foreach (message in batch) Send()
Commit()
while (commit failed)

The solution makes use of a last value queue and queue browser to handle edge-case commit failures, and a ring-buffer of pre-allocated messages, which accounts for the bulk of the code not directly related to the above pseudocode.

The .csproj can be found on github.

What I have noticed is that the Commit() call takes quite a long time - between 100 and 130 milliseconds. Here's the send-and-commit block .

The overall throughput is heavily dependent on the duration of the Commit() operation, since I have to block on that call before starting the next batch (otherwise the retransmission of the batch would put messages out of order in the overall sequence).

Using this technique, one would expect the maximum message rate per second to be roughly:commit_batch_size * 1000 / commit_wait_in_mswhich is 1600 messages per second if batch size is 200 and the commit blocks for ~125 milliseconds. And that is indeed what I am seeing.

I have a separate project which I am using to measure the "ping time" to the broker (the time to send one message and get an ack or nack), which for me averages around 20 ms. So the Commit seems to have about 100ms of overhead not accounted for by pure latency.


Just to add a bit more to the complexity of this question… 😅

I don't know whether this additional delay is happening due to actual commit processing time, or perhaps some kind of response buffering? I have a separate project on github which is my "ping" publisher, and in that project I provide the ability to set the maximum number of messages that are waiting for an ack/nack via the maxUnacked parameter, as well as setting the value for the https://docs.solace.com/API-Developer-Online-Ref-Documentation/net/html/247f4507-a3c6-185f-03cc-c1bb45e9bca4.htm .

I have noticed that in the ping publisher - which is not using transactions but rather just doing send/ack, with configurable concurrency - there are certain "threshholds" that significantly decrease throughput. For example, calling Ping(count: 1000, maxUnacked: 1, publishWindowSize: 5); results in ~85 messages per second (completely reasonable since every send is effectively serialised if maxUnacked is set to 1), but calling Ping(count: 1000, maxUnacked: 1, publishWindowSize: 6); results in almost exactly one message per second. I expect this is due to a 1-second-maximum window release timer (per https://solace.com/blog/understanding-guaranteed-message-publish-window-sizes-and-acknowledgement/). Worth noting that the standard deviation is also enormous - the last few messages take a lot longer than the rest - again, presumably due to the solace router waiting for a timer to elapse before sending a window that is not yet complete (or more than 1/3rd complete), which (for the last handful of messages) will depend of the result of total_message_count % window_size.

Could it be that the Commit() response back to the solace library is being buffered in a similar way to acks/nacks?

Tagged:

Best Answer

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    #2 Answer ✓

    Greetings again @allmhhuran ,

    So after chatting around with a number of my colleagues, there does not seem to be any deterministic way to auto-magically increase the performance of the actual commit operation - as there are some complexities behind the scenes which sometimes require duplicate disk writes, something called "internal republishing", and many other mini steps that occur when calling commit.

    From some basic experimentation, I also observe that most (or maybe none) of the standard publish windowing settings and ACK settings have much of an effect on total throughput when using the micro-batch approach which we previously discussed and you have shared on your GitHub repo. My understanding is there are no thresholds, delays, or other complexities added to the commit / commit response sequence, and that it seems the observed latency is a pure byproduct of the computationally expensive action of committing itself.

    The next approach I tried was to use multiple Transacted Sessions in series. Similar to your code which uses a dual ring buffer of IMessage instances to cut down on factory costs, I attempted a similar approach of using up to 16 separate session instances, filling each with a batch, committing on a separate Task instance and once the commit was complete, releasing the session instance and committing the next (to preserve order).

    I had hoped that in using "sliding window" transactions I would see significant performance boosts, but it appears this is negligible.

    Feeling out of options here, but would you be interested in revisiting the original question and seeing a solution which moves the transaction-like semantics to the publisher and subscriber? I have a relatively simple algorithm for ensuring a consumer gets messages in order and exactly once; even though they may be out-of-order and duplicated on the queue itself.

Answers

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    #3 Answer ✓

    Greetings again @allmhhuran ,

    So after chatting around with a number of my colleagues, there does not seem to be any deterministic way to auto-magically increase the performance of the actual commit operation - as there are some complexities behind the scenes which sometimes require duplicate disk writes, something called "internal republishing", and many other mini steps that occur when calling commit.

    From some basic experimentation, I also observe that most (or maybe none) of the standard publish windowing settings and ACK settings have much of an effect on total throughput when using the micro-batch approach which we previously discussed and you have shared on your GitHub repo. My understanding is there are no thresholds, delays, or other complexities added to the commit / commit response sequence, and that it seems the observed latency is a pure byproduct of the computationally expensive action of committing itself.

    The next approach I tried was to use multiple Transacted Sessions in series. Similar to your code which uses a dual ring buffer of IMessage instances to cut down on factory costs, I attempted a similar approach of using up to 16 separate session instances, filling each with a batch, committing on a separate Task instance and once the commit was complete, releasing the session instance and committing the next (to preserve order).

    I had hoped that in using "sliding window" transactions I would see significant performance boosts, but it appears this is negligible.

    Feeling out of options here, but would you be interested in revisiting the original question and seeing a solution which moves the transaction-like semantics to the publisher and subscriber? I have a relatively simple algorithm for ensuring a consumer gets messages in order and exactly once; even though they may be out-of-order and duplicated on the queue itself.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited March 2024 #4

    Thanks @nicholasdgoodman

    I ran this logic through a battery of scenarios and I did start to notice a lot of variation on the commit duration… sometimes as high as 200ms, but at other times it was as low as the ping time. But these differences did not correlate with changes to batch size, or any session properties, or anything like that, so I was mentally prepared to hear that the delay was due to internal machinations on the broker 🙂

    That being the case, subscriber reordering does seem like a bit of a necessity to get the highest possible throughput.

    When thinking about my options to I did think of a few possible algorithms involving subscriber reordering, even though I was ideally trying to avoid it, because it is of course a fairly common design. I think my best idea was to publish messages as a linked list, with each message containing two values (which can be numbers, or guids, or anything really), one for "this message", and one for the "next message". While this does require that the subscriber have knowledge about the "most out of order something can be", it doesn't require a durable, gapless sequence generator, and in the case of a crash/host restart/etc the same LVQ technique can be used by the publisher to get the last "next message" that was sent, so it can continue the sequence unbroken (although this time I think it would be better to add the topic subscription to populate the LVQ rather than pushing to the LVQ directly)

    But if you have another algorithm then yes, I'd be happy to see it!

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    edited March 2024 #5

    @allmhhuran

    So the general notion is to essentially move the transactional semantics into the publishers and consumers in such a way that messages on the queue will be "mostly ordered" and easily re-assembled on the consumer side in the case of errors. The message stream will include both payload and metadata which can be implemented in a few different ways:

    • As extra headers / user properties on the messages themselves
    • As part of the payload, using an "envelope" with control statements and data living side by side
    • As separate, special "flow control" messages on the queue itself

    Similar to transactions, messages will be clumped into batches of a pre-determined size, though they will still be sent individually and individually acknowledged. But whichever approach is used, a simple publisher and consumer algorithm for guaranteed, in-order messages is as follows:

    Publisher Side

    All messages are assigned a batch ID and sequence number. Batch IDs should be monotonically increasing but do not need to be continuous (e.g., Epoch timestamp or similar). Sequence numbers are from 0 to n-1, where n is the configured max batch size.

    • The publisher creates a buffer / array for messages in the current transaction
    • As messages are received they are assigned batch ID and sequence number, which are are published with the payload immediately to the broker. Optionally, include the batch size as a header.
    • If a given message receives an ack failure, resend it. Optionally, resend all other messages in the batch after the failed message (see comments below).
    • When the current batch is full, start a new batch instance (but keep the previous one) and continue sending messages as they are generated
    • Once all messages within a batch have been acknowledged, publish a "commit" command - either by using a special message type, or by adding a special header to the next message - which contains the batch ID.
    • Once the "commit" message has been Acked, free the related batch from memory (either re-use or dispose). If the commit message itself fails, resend it until it succeeds.

    Consumer Side

    The consumer's job is to buffer messages into their respective batches until it receives a batch commit message, at which point it releases the entire batch to the application consumer code for processing and acknowledgement.

    • When a message is received, extract its batch ID, sequence number, and optionally batch size metadata.
    • If the batch ID is not known, create a new batch. If the batch ID is less than all known batches, mark the new batch as the "current batch".
    • Insert the message into the batch message array / list in the correct position as indicated by the sequence number. If the message is a duplicate of a previously received message, immediately Ack the previously received one, and overwrite it with the new message (see comments below).
    • When a batch "commit" message is received, mark the batch "ready to consume"
    • If the "current batch" is "ready to consume" dispatch individual messages to the processor handling code and upon successful processing, Ack each message.
    • Once all messages in a batch have been processed and Acked, the batch is complete. Dispose or recycle it, and mark the next batch with the lowest batch ID as the "current batch".

    Sample Sequence

    In the following example, batch IDs are an arbitrary 4-digit number, and batch sizes are 5. A special sequence number C is used to indicate a commit message, but in practice this could be represented as simply the number n since valid sequence numbers are 0 to n-1.

          Publish             Ack                Enqeued         Consumed
     [Batch/Seq: Data]    [Batch/Seq: ?]    [Batch/Seq: Data]   [Data...]
    ---------------------------------------------------------------------------
          8190/0: A                             8190/0: A    
          8190/1: B                             8190/1: B    
          8190/2: C                             8190/2: C    
          8190/3: D                             8190/3: D    
          8190/4: E                             8190/4: E    
          8193/0: F                             8193/0: F    
          8193/1: G          8190/0               --- 
          8193/2: H          8190/1             8193/2: H    
          8193/3: I          8190/2             8193/3: I    
          8193/4: J          8190/3             8193/4: J    
          8195/0: K          8190/4             8195/0: K    
          8190/C             8193/0             8190/C          A, B, C, D, E
          8195/1: L          8193/1: NAK        8195/1: L    
          8193/1: G          8193/2             8193/1: G    
          8195/2: M          8193/3               --- 
          8195/3: N          8193/4               --- 
          8195/4: O          8195/0             8195/4: O    
          8198/0: P          8190/C             8198/0: P    
          8198/1: Q          8195/1             8198/1: Q    
          8198/2: R          8193/1             8198/2: R
          8193/C             8195/2: NAK        8193/C          F, G, H, I, J  
          8195/2: M          8195/3: NAK        8195/2: M
          8195/3: N          8195/4             8195/3: N    
          8198/3: S          8198/0             8198/3: S    
          8198/4: T          8198/1             8198/4: T    
          8199/0: U          8198/2             8199/0: U    
          8199/1: V          8193/C             8199/1: V    
          8199/2: W          8195/2             8199/2: W    
          8199/3: X          8195/3             8199/3: X    
          8195/C             8198/3             8195/C          K, L, M, N, O
          8199/4: Y          8198/4             8199/4: Y    
          8198/C             8199/0             8198/C          P, Q, R, S, T
          8203/0: Z          8199/1             8203/0: Z    
                             8199/2           
                             8199/3           
                             8199/4           
          8199/C             8198/C             8199/C          U, V, W, X, Y
                             8203/0           
    
         (timeout)      
          8203/C                                8203/C          Z
    

    Edge Cases, Startup and Shutdown

    Because messages are processed one-at-a-time from the queue (unless transactions are used on the consumer side) it is theoretically possible for a consumer to have gone down while it was in the middle of Ack-ing messages from a single batch. This means upon resuming the consumer service it is possible that when a batch commit message is received, the batch may be not completely full.

    If the producer code opted to only resend the single failed message (as shown above) this can result in a sparse batch, and a consumer will have to determine how to proceed. If all messages after a failure are resent, instead, an incomplete batch on the consumer side will not be sparse, but rather the tail end of a given batch and likely can still be safely processed.

    Likewise, publishers must take care when exiting to send a final commit message before disconnecting, or the queue will contain partial batches which a consumer will never process (without additional logic to address this edge case). If batch IDs can be tied to the source data itself instead of timestamp or other universal monotonically increasing value, this can also be mitigated as a batch ID can be resumed after publisher restart.

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    That's an interesting algorithm. As you mention in your "edge cases", it does seem to create some difficulty in the case of, say, a publisher exception or unexpected termination. That could still be handled with an LVQ (to pick up the last batch id so that the "end batch" control message can still be sent), but in the meantime a subscriber is waiting with an incomplete batch. Not ideal but also not really a huge issue.

    In general it seems to be an algorithm with a lot of moving parts, so to speak. A lot of code to handle the different functional elements. I think it would work but, in my mind's eye at least, the linked-list approach involves less control code. The main disadvantage of the linked list is that without something like a batch id the subscriber just has to be configured with some value for how long they will wait to fill a gap in the list, which will be the same as the size of the publisher ringbuffer, so the subscriber has to know something about the publisher implementation. But if I really wanted to avoid that, the size of the ringbuffer could be included in every message (like the batch size value in your algorithm).

    I think I will do a draft implementation of both to check my intuition, and get someone else to pick based on which code they find easier to understand!