Using an ITransactedSession and handling errors with the .net API
Suppose I am publishing guaranteed messages to a topic. I want to avoid sending duplicate messages. I am also going to send messages in transactional batches to make it easier to guarantee the broker receives the messages in the same order that the publisher creates them, without having to fall back to completely sequential Send-and-Wait-per-message logic.
OK, ISend()
a message via an ITransactedSession
, with a SOLCLIENT_OK
result. For the sake of simplicity I will assume there a single message in the transaction batch for this example.
I then call Commit()
.
If this also returns SOLCLIENT_OK
we're good, move onto the next message (batch).
My confusion lies in trying to understand how to handle the alternative Commit result possibilities. There is not much documentation on use of transactions, and none, as far as I can find, specific to the .net API.
Possibility 1: I get an OperationErrorException with a ReturnCode of SOLCLIENT_ROLLBACK
I then call GetLastSDKErrorInfo and determine that the subcode is SpoolOverQuota. This is, I believe, an error that could be overcome with a simple wait-and-retry for the Send and Commit unit.
Is that the appropriate action for SpoolOverQuota, and SpoolOverQuota the only subcode where wait-and-retry is the appropriate action? This seems to be sort of equivalent to the case where a regular ISession.Send
results in a "NACK" callback due to a queue being full, and therefore has the same retry resolution.
Everything else under SOLCLIENT_ROLLBACK seems to imply a need to reset or recreate some other state. I think in all of those cases I would simply restart the whole publisher, since trying to appropriately handle all of the other subcodes individually doesn't seem worth the trouble. (1)
Possibility 2: I get an OperationErrorException with a ReturnCode of SOLCLIENT_FAIL
What now? The documentation describes this situation thus:
"the commit may have succeeded on the message-router before the response was lost. Or the commit request may have failed (roll back)."
I take this to mean that the transaction has in fact been either committed or rolled back, and we don't know which. In other words, it does not mean that neither has happened, and thus it would be pointless to immediately retry the Commit. Is that right? And is there any way to tell, from a subsequent operation, which it was? In other words, after this occurs, is there any subsequent code I can write to determine whether it was actually a rollback and I should redo the Send and Commit unit, or it was a commit and I should move on to the next message?
(1) In my existing solution restarting the publisher would not result in data loss - the message would be picked up from the outbox and re-sent.
Best Answer
-
@allmhhuran this is an intriguing question, and I have asked around to fellow Solaceans their thought on the scenario. The general consensus is this:
Assuming all messages are valid and the queue(s) have sufficient capacity, a
pub-pub-pub-ack-nak-ack
sequence is something that should never be encountered. An intermediate failed message should only happen if the message itself is invalid (and thus would fail on republish), or the queue has insufficient space to store it (for example if messages 1 and 3 are small, and message 2 is large).That being said…
Let's assume that the intermittent NAK is just improbable, but not impossible - or that there is some extreme edge case we are overlooking, and we want something more performant than than single message
pub-wait-ack-pub-wait-ack
sequencing.In that case, I find the suggestion to use transacted sessions + an LVQ a very clever and reasonable way to accomplish this. I would, however, make some minor changes to the approach which would hopefully simplify the configuration overhead and make it even more flexible:
- During startup, provision any data queues (if needed) and add relevant topic subscriptions to them
- Create an LVQ named for the particular workflow or unit of work - but do not add topic subscriptions
- Open a queue browser to the LVQ and read data from it if any (from a previous session) - and perform corrective initialization actions as-needed
- Begin a transacted session and start publishing data messages to the appropriate topic(s)
- After n number of message or some fixed time interval or timeout (or possibly both) stop publishing data messages
- Send a special "ID" message (this could also be the same as the last sent payload) directly to the LVQ, call commit on the transacted session, and immediately read back from the LVQ browser until you receive back the message just published
- Rinse and repeat…
In this slightly modified scenario, the LVQ doesn't need to track or carry the same data topic subscriptions and only receives updates immediately before a commit operation. It also provides a convenient way to establish a restore point should the session go down and get a more direct acknowledgement that the transaction commit actually succeeded.
Assuming you are only performing this "control message / commit / readback" once every several hundred data messages, the overhead of this extra step will be negligible and very similar to the timing of just calling commit.
What are your thoughts? Can you spot any edge cases here?
If I have time tomorrow I might even draft up a code snippet that does this pattern.
1
Answers
-
One followup question on this. The documentation for
reads:
"Thrown when the operation fails. Typically this is thrown when the commit request has failed and must be rolled back. When the ReturnCode is set to SOLCLIENT_ROLLBACK, further reason for the rollback is available in the subcode"
But the documentation for SOLCLIENT_ROLLBACK reads:
"Commit() returns this when the transaction has already been rolled back."
The former says the OperationCancelledException indicates that the transaction "must be rolled back", but the latter says that the SOLCLIENT_ROLLBACK ReturnCode on that exception means it "has already been rolled back". In my original message I have taken "must be rolled back" to mean "a rollback was necessarily performed by the broker" (ie, the client should not issue a roll back) but on re-reading the documentation I'm not so sure.0 -
As a possible answer to my own question, "is there any way to tell, from a subsequent operation, which it was?", is a Last Value Queue an appropriate solution here? Presumably, if the commit actually succeeded the LVQ should have the last message in the batch, if it does not then the operation was rolled back.
It would be nice if this mechanism to "double-check-in-case-of-failure" could be made a feature of the API itself, without having to manually write up all of the logic for having the publisher also create an LVQ, and a subscription to that queue, and "manually" get messages from that subscription in case of a SOLCLIENT_FAIL response to a Commit(). Something along the lines of
ITransactedSession.EnableLastMessageCheck()
andITransactedSession.GetLastCommittedMessage()
would be nice.0 -
Hello @allmhhuran: I am still looking into the detailed characteristics of transacted sessions in the .NET SDK, and specifically how to correctly interpret and handle
SOLCLIENT_FAIL
response codes.First, I wanted to better understand the reasoning behind using transactions for order guarantees. The .NET SDK (and all of the SDKs in fact) will automatically preserve the order of individual
ISession.Send(…)
calls since internally this method synchronously copies the given message to the outgoing message buffer assumingSOLCLIENT_OK
is the return code.It is good practice to ensure each individual message is acknowledged by the broker via session events, but note that you do not have to wait for a given message Ack before sending the next one. Messages will be buffered, sent in order, queued, and acknowledged in the same order as the send method call is invoked.
The only slim edge case where messages might not be enqueued correctly might be if the queue is full and simultaneously being read from. In theory a sequence of messages could be Acked-Failed-Acked in that scenario, although I'm not 100% sure this would happen either.
Note, that just like the commit edge case, a guaranteed message could theoretically be received by the broker, written to queue, and the connection could go down that very instant and the publisher never receives the Ack. (It will, however, receive some kind of session failure and know that something went wrong).
Similar to the failed commit scenario, in this case there is no direct means of observing whether or not the message was enqueued (i.e., guaranteed messaging is at least once but technically permits duplicates). A manual workaround to this would be, as you describe, leverage an LVQ with identical subscriptions and browse it upon reconnect, but this is highly manual.
If duplicates cannot be tolerated at all, the recommended approach is to include a unique message ID at the publisher side and perform de-duplication at the consumer site (known as effectively once delivery semantics).
0 -
Hi Nicholas.
On the topic of "exactly once vs at least once" - yes, "at least once" is fine - "exactly once" being technically impossible. When I wrote that "I want to avoid sending duplicate messages", what I really meant was something like this: Exactly-once-delivery is not a strict requirement, but I don't want to consciously add logic of my own which will significantly increase the likelihood of duplicate delivery. In other words, in the case of a SOLCLIENT_FAIL on a Commit(), I would not want to always redo the entire batch send-and-commit-unit, since the batch may have been committed, which could easily result in many dozens or even hundreds of duplicate messages, not just one or two which may result from technical realities.
"First, I wanted to better understand the reasoning behind using transactions for order guarantees"
Suppose I have a publisher that has a batch of messages ready to send from its outbox. It callsSend
for each message without waiting for the acks (it processes the acks asynchronously. When an ack is received the message can be deleted from the outbox). If we waited for the acks and serialised the Sends then the send rate would at most be the inverse of the ping time… which would be on the order of 100-200 messages per second, which is not nearly fast enough.
You identified the issue in your post, but just to make the scenario explicit, suppose we have the following timeline:T=0, Send (message0)
T=1, Send (message1)
T=2, ACK (message0)
T=3, Send (message2)
T=4, NACK (message1), // nack'd for whatever reason
T=5, ACK (message2)
T=6, Send (message1) // re-sent due to NACK
T=7, Send (message3)
T=8, ACK (message1)
// …
Message 1 must be re-sent. But message 2 has already been received and ACK'd. So the order in which messages arrive at the broker, and thus arrive at subscribers, is 0, 2, 1, …, which is not correct.
In general any mechanism which sends messages individually, and processes acks asynchronously, will have this problem.Transactions seem to provide a solution to absolutely guarantee receive order while significantly increasing throughput, since (from the way I read the documentatation) the Commit is like "sending a bunch of messages all at once, with order preserved". If we send, say, 200 messages before the commit, then wait for the commit response before sending the next 200, the message rate is now at most 200 times the inverse of the ping time, or something on the order of 20000 - 40000 messages per second.
Of course, if the transaction fails then that would mean having to re-send 200 messages, not just one or two, which is expensive. But given the assumption that the vast majority of messages will be successfully pushed to the broker, an expensive but very rare failure is not a significant problem.
There is another way to deal with this problem, which is to have logic on the subscriber side which reorders messages correctly in the case of out-of-order delivery. We might be able to make claims like "we know that a message will never be more than X messages out of order, because the publisher has logic to ensure that Sends don't get too far ahead of acks".There are two issues with that approach. First, it means subscriber logic is now tightly coupled to publisher logic (it has to know to reorder, and it has to know how out-of-order something could maximally be), which to some degree undermines one of the principles of a pub-sub model.
But the second and more compelling objection is that this is simply quite difficult to do. Something like TCP can do it easily because sessions don't have to persist through host restarts and so on. But publications and subscriptions can run for years. Hosts get restarted. Things crash. This requires a message sequence number generator which is durable, it can't simply be an in-memory increment. But generating durable sequences, quickly, with no gaps, is far from trivial.
0 -
@allmhhuran this is an intriguing question, and I have asked around to fellow Solaceans their thought on the scenario. The general consensus is this:
Assuming all messages are valid and the queue(s) have sufficient capacity, a
pub-pub-pub-ack-nak-ack
sequence is something that should never be encountered. An intermediate failed message should only happen if the message itself is invalid (and thus would fail on republish), or the queue has insufficient space to store it (for example if messages 1 and 3 are small, and message 2 is large).That being said…
Let's assume that the intermittent NAK is just improbable, but not impossible - or that there is some extreme edge case we are overlooking, and we want something more performant than than single message
pub-wait-ack-pub-wait-ack
sequencing.In that case, I find the suggestion to use transacted sessions + an LVQ a very clever and reasonable way to accomplish this. I would, however, make some minor changes to the approach which would hopefully simplify the configuration overhead and make it even more flexible:
- During startup, provision any data queues (if needed) and add relevant topic subscriptions to them
- Create an LVQ named for the particular workflow or unit of work - but do not add topic subscriptions
- Open a queue browser to the LVQ and read data from it if any (from a previous session) - and perform corrective initialization actions as-needed
- Begin a transacted session and start publishing data messages to the appropriate topic(s)
- After n number of message or some fixed time interval or timeout (or possibly both) stop publishing data messages
- Send a special "ID" message (this could also be the same as the last sent payload) directly to the LVQ, call commit on the transacted session, and immediately read back from the LVQ browser until you receive back the message just published
- Rinse and repeat…
In this slightly modified scenario, the LVQ doesn't need to track or carry the same data topic subscriptions and only receives updates immediately before a commit operation. It also provides a convenient way to establish a restore point should the session go down and get a more direct acknowledgement that the transaction commit actually succeeded.
Assuming you are only performing this "control message / commit / readback" once every several hundred data messages, the overhead of this extra step will be negligible and very similar to the timing of just calling commit.
What are your thoughts? Can you spot any edge cases here?
If I have time tomorrow I might even draft up a code snippet that does this pattern.
1 -
I like the idea of publishing to an LVQ without requiring a topic subscription for the same topic being published to, because one of the premises of our current architecture is "a publisher should never subscribe to what it is publishing". This prevents possible infinite-loop scenarios. I enforce this with publish and subscribe permissions and exceptions on a client ACL. If I had to create a topic subscription for the LVQ then I would not be able to enforce those permissions anymore. Your solution is therefore objectively better for our situation, thanks.
I think it's correct to consider the nack "improbable but not impossible", and therefore have a robust solution like this in place. Principally because the most likely scenario which causes the out of order delivery is precisely the one you identified before - a queue fills up.
I don't think this scenario is dependent on the relative size of messages, although per your example that would certainly make the scenario more likely. But really, it seems safe to assume that if a queue fills up it will eventually not be full anymore (it is being drained at the other end), and the scenario you mentioned in your first response - "the queue is full and simultaneously being read from [and Sends are in flight]" - inevitably arises when that does eventually happen. Even in such a scenario the messages that are re-sent due to being nacked could, theoretically, all end up being re-sent at their correct position in the overall sequence, but that is unlikely.0 -
Here's the relevant guts of a little test implementation I have put together. I have not yet tested any failure modes (eg, by forcing the queue to fill up). But it demonstrates the idea.
What surprised me is that I am only getting ~2000 messages per second using this method, but I think that is due to the ping time between the host I am running the program on, and the broker. Worth noting that my message rate using a different implementation from this same location was significantly higher, but that was before we moved from a private cloud hosting to SaaS hosting. I am now a bit concerned about ping to SaaS hosting (provisioned in the same Azure region as our own tenancy!) 🙁
The full project is here:https://github.com/allmhuran/SolaceExperiments/tree/master/GuaranteedOrderTransactionalPublisher
private async Task PublishAsync()
{
IMessage?[] commitBatch = new IMessage[200];
// ringtail.reader is the read head of the circular message buffer, containing enqueued
// messages. This task will complete when a message becomes available
while (await _ringTail.Reader.WaitToReadAsync())
{
int retryDelayMs = 10, i = 0, j;
bool retry = false;
// grab messages until we fill the batch or there are no more immediately available
while (i < commitBatch.Length && _ringTail.Reader.TryRead(out commitBatch[i])) i++;
do
{
// send every message in the batch, and send the sequence number of the last message
// in the batch to the LVQ
for (j = 0; j < i; j++) _transactedSession.Send(commitBatch[j]);
_lvqMessage.SequenceNumber = commitBatch[--j]!.SequenceNumber;
_transactedSession.Send(_lvqMessage);
try
{
_transactedSession.Commit();
}
catch (OperationErrorException x)
{
if (x.ReturnCode == ReturnCode.SOLCLIENT_ROLLBACK)
{
// if there's no storage available then wait and retry the batch
if (ContextFactory.Instance.GetLastSDKErrorInfo().SubCode == SDKErrorSubcode.SpoolOverQuota) retry = true;
// don't bother handling any other subcode here, treat them as fatal
else throw;
}
else if (x.ReturnCode == ReturnCode.SOLCLIENT_FAIL)
{
// if the transaction was rolled back then the message in the last value queue
// will not have the same sequence number as the last message in the commti
// batch, so wait and retry the batch otherwise the commit actually succeeded
retry = _lvq.GetNext().SequenceNumber != commitBatch[j]!.SequenceNumber;
}
else throw; // something weird happened
}
if (retry) await Task.Delay(retryDelayMs *= 2);
} while (retry);
// batch committed, put messages back into free list
for (j = 0; j < i; j++) _ringHead.Writer.TryWrite(commitBatch[j]!);
}
}0