TaskCompletionSource as CorrelationKey for guaranteed messages - safe async?

allmhhuran
allmhhuran Member Posts: 47 ✭✭✭

Suppose I wish to publish guaranteed messages, but I want to wrap the solace callback guts inside a library which is a little more modern - for example, using the TAP.
Is there any reason why something like the following would be unsafe? For this example, assume send blocking is enabled (although I think this could be extended to non-blocking send? I've been looking into using ManualResetEvents set by CanSend events and so on...)

public Task<bool> Send()
{
    using var msg = SSM.ContextFactory.Instance.CreateMessage();
    // set other message properties
    var tcs = new TaskCompletionSource<bool>();
    msg.CorrelationKey = tcs;
    session.Send(msg); // check send result omitted for brevity
    return tcs.Task;
}

private void OnSessionEvent(object sender, SSM.SessionEventArgs args)
{
    switch (args.Event)
    {
        case SSM.SessionEvent.Acknowledgement:
            (args.CorrelationKey as TaskCompletionSource<bool>).SetResult(true);
            break;

        case SSM.SessionEvent.RejectedMessageError:
            (args.CorrelationKey as TaskCompletionSource<bool>).SetResult(false);
            break;
        //...
    }
}

Best Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin
    #2 Answer ✓

    Blast from the past! Hello @allmhhuran ... how are you doing, I hope all is well..!

    I've been poked about this old thread, so thought I'd reply.

    Ok, so full disclosure: I'm not a .NET guy. But I did some reading into what this TaskCompletionSource object is, and the patterns that it's used for. Short answer: I'm pretty sure you can use it as you are suggesting here.

    The CorrelationKey for a published Guaranteed message can be any object, just like in JCSMP (which I'm more familiar with). Often, the message object itself is used for the CorrelationKey, but you can also use other "helper" objects or structures instead. For example, check out this one older Java sample: https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/features/AdPubAck.java

    Regarding blocking vs. non-blocking send... I just want to be absolutely clear that this is NOT blocking to the broker... this is the API's behaviour for blocking trying to write things to its socket. 99% of the time, you want blocking behaviour. Non-blocking is really useful if you have an ultra-high-performance single threaded application and can't afford to block for an I/O operations. The C / .NET APIs are always non-blocking to the broker. Only our JMS API provides the ability for the send() call to block until a response has been received by the broker.

    Are you looking to use this TAP pattern to implement a kind of blocking publish, where your framework will allow an app to block waiting on the receipt of a successful ACK? Something like this, which uses a CyclicBarrier to block on? (apologies for Java again) https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/features/AdPubAck.java

    Let me know if you're still concerned about this, it's only been a year since you posted!! :grimace: And I can ask one of our R&D .NET guys to take a look.

    Thanks!

    aaron

  • Ragnar
    Ragnar Member, Employee Posts: 65 Solace Employee
    #3 Answer ✓

    Hi @allmhhuran

    I've worked with Aaron and also with the .NET product extensively. The solution you have developed is elegant and safe. The object you pass to the API is preserved by reference internally and will be valid on the callback later.

    Regards
    Ragnar

Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin
    #4 Answer ✓

    Blast from the past! Hello @allmhhuran ... how are you doing, I hope all is well..!

    I've been poked about this old thread, so thought I'd reply.

    Ok, so full disclosure: I'm not a .NET guy. But I did some reading into what this TaskCompletionSource object is, and the patterns that it's used for. Short answer: I'm pretty sure you can use it as you are suggesting here.

    The CorrelationKey for a published Guaranteed message can be any object, just like in JCSMP (which I'm more familiar with). Often, the message object itself is used for the CorrelationKey, but you can also use other "helper" objects or structures instead. For example, check out this one older Java sample: https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/features/AdPubAck.java

    Regarding blocking vs. non-blocking send... I just want to be absolutely clear that this is NOT blocking to the broker... this is the API's behaviour for blocking trying to write things to its socket. 99% of the time, you want blocking behaviour. Non-blocking is really useful if you have an ultra-high-performance single threaded application and can't afford to block for an I/O operations. The C / .NET APIs are always non-blocking to the broker. Only our JMS API provides the ability for the send() call to block until a response has been received by the broker.

    Are you looking to use this TAP pattern to implement a kind of blocking publish, where your framework will allow an app to block waiting on the receipt of a successful ACK? Something like this, which uses a CyclicBarrier to block on? (apologies for Java again) https://github.com/SolaceSamples/solace-samples-java-jcsmp/blob/master/src/main/java/com/solace/samples/jcsmp/features/AdPubAck.java

    Let me know if you're still concerned about this, it's only been a year since you posted!! :grimace: And I can ask one of our R&D .NET guys to take a look.

    Thanks!

    aaron

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭

    Hey Aaron.

    I have now been using this sort of code for quite some time and haven't run into any null reference exceptions, or deadlocks, or anything like that, so it seems like the approach works well.

    I have been writing library code that abstracts away the implementation details of the particular broker technology to make a set of interfaces that c# developers on my team can easily use.

    I have a Publisher class which wraps up the implementation details of publication - like broker sessions, sessions, session events, topics, and so on. Clients deal only with an IPublication interface, constructed by a Broker factory object, which is quite trivial:

        public interface IPublication<T>
        {
            Task<bool> PublishAsync(T payload);
        }
    

    |
    My only concern, and the reason for the question, was in regard to making sure that the TaskCompletionSource object was "safe" to push into the API client library and pull back out in the SessionEventHandler passed to the solace IContext.CreateSession.

    private void SessionEventHandler(object? sender, SSM.SessionEventArgs args)
    {
        switch (args.Event)
        {
            case SSM.SessionEvent.Acknowledgement:
                (args.CorrelationKey as TaskCompletionSource<bool>)?.SetResult(true);
                break;
    
            case SSM.SessionEvent.RejectedMessageError:
                (args.CorrelationKey as TaskCompletionSource<bool>)?.SetResult(false);
                logger?.LogWarning($"{this} {args.Event} {args.Info}");
                break;
          // ...
       }
    }
    

    The key point is getting that PublishAsync method into the Task-returning async form. This sort of callback wrapping to create a TAP-like interface is fairly standard in c#. Here's my send implementation (just creating new messages here rather than using a message ringbuffer or anything like that for example code brevity):

    public Task<bool> PublishAsync(T payload)
    {
        try
        {
            using (var msg = solaceFactory.CreateMessage())
            {
                msg.Destination = topic;
                msg.DeliveryMode = SSM.MessageDeliveryMode.Persistent;
                msg.TimeToLive = TimeSpan.FromDays(2).Milliseconds;
                msg.BinaryAttachment = serializer.Serialize(payload);
                // adding a taskcompletionsource to the local message allows us to use the solace
                // send/callback async pattern in TAP async/await form
                var tcs = new TaskCompletionSource<bool>();
                msg.CorrelationKey = tcs;
                return session.Send(msg) == SSM.ReturnCode.SOLCLIENT_OK ? tcs.Task : Task.FromResult(false);
            }
        }
        catch (Exception x)
        {
            logger?.LogError($"Exception when publishing to {topic.Name}: {x.Message}\nMessage payload: {payload}");
            return Task.FromResult(false);
        }
    }
    

    This in itself makes no claims or promises about send concurrency, that's a decision for the consumer of the IPublication. For instance, you could serialise messages:

    foreach (var message in messageDTOCollection)
    {
       await publication.SendAsync(message);
    }
    

    Or you could parallelise them using any number of patterns or technologies, like System.Threading.Tasks.Dataflow, or a Task buffer and semaphore, or channels, etc. These are decisions for particular publishers and subscribers to make.

  • Ragnar
    Ragnar Member, Employee Posts: 65 Solace Employee
    #6 Answer ✓

    Hi @allmhhuran

    I've worked with Aaron and also with the .NET product extensively. The solution you have developed is elegant and safe. The object you pass to the API is preserved by reference internally and will be valid on the callback later.

    Regards
    Ragnar