Hey Aaron.
I have now been using this sort of code for quite some time and haven’t run into any null reference exceptions, or deadlocks, or anything like that, so it seems like the approach works well.
I have been writing library code that abstracts away the implementation details of the particular broker technology to make a set of interfaces that c# developers on my team can easily use.
I have a Publisher
class which wraps up the implementation details of publication - like broker sessions, sessions, session events, topics, and so on. Clients deal only with an IPublication
interface, constructed by a Broker
factory object, which is quite trivial:
public interface IPublication<T>
{
Task<bool> PublishAsync(T payload);
}
|
My only concern, and the reason for the question, was in regard to making sure that the TaskCompletionSource
object was “safe” to push into the API client library and pull back out in the SessionEventHandler
passed to the solace IContext.CreateSession
.
private void SessionEventHandler(object? sender, SSM.SessionEventArgs args)
{
switch (args.Event)
{
case SSM.SessionEvent.Acknowledgement:
(args.CorrelationKey as TaskCompletionSource<bool>)?.SetResult(true);
break;
case SSM.SessionEvent.RejectedMessageError:
(args.CorrelationKey as TaskCompletionSource<bool>)?.SetResult(false);
logger?.LogWarning($"{this} {args.Event} {args.Info}");
break;
// ...
}
}
The key point is getting that PublishAsync
method into the Task-returning async form. This sort of callback wrapping to create a TAP-like interface is fairly standard in c#. Here’s my send implementation (just creating new messages here rather than using a message ringbuffer or anything like that for example code brevity):
public Task<bool> PublishAsync(T payload)
{
try
{
using (var msg = solaceFactory.CreateMessage())
{
msg.Destination = topic;
msg.DeliveryMode = SSM.MessageDeliveryMode.Persistent;
msg.TimeToLive = TimeSpan.FromDays(2).Milliseconds;
msg.BinaryAttachment = serializer.Serialize(payload);
// adding a taskcompletionsource to the local message allows us to use the solace
// send/callback async pattern in TAP async/await form
var tcs = new TaskCompletionSource<bool>();
msg.CorrelationKey = tcs;
return session.Send(msg) == SSM.ReturnCode.SOLCLIENT_OK ? tcs.Task : Task.FromResult(false);
}
}
catch (Exception x)
{
logger?.LogError($"Exception when publishing to {topic.Name}: {x.Message}\nMessage payload: {payload}");
return Task.FromResult(false);
}
}
This in itself makes no claims or promises about send concurrency, that’s a decision for the consumer of the IPublication
. For instance, you could serialise messages:
foreach (var message in messageDTOCollection)
{
await publication.SendAsync(message);
}
Or you could parallelise them using any number of patterns or technologies, like System.Threading.Tasks.Dataflow
, or a Task
buffer and semaphore, or channels, etc. These are decisions for particular publishers and subscribers to make.