Subscribe to direct messages with RequestConfirm and correlationKey (.NET)

radekm
radekm Member Posts: 23

Hi,

I'm using .NET. I would like to subscribe to direct messages.

I have set session property SubscribeBlocking to false. I plan to use TaskCompletionSource as correlationKey and complete it when handling SubscriptionOk or SubscriptionError.

Unfortunately I can't find suitable overload of ISession.Subscribe.

There are 3 overloads. Only 2 allow me to specify correlationKey. Of those 2 one takes IDispatchTarget but I have topic dispatch turned off. And the last candidate needs IEndpoint and I don't know where to get IEndpoint for current session (null doesn't work).

Thanks

Tagged:

Best Answer

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 42 Solace Employee
    #2 Answer ✓

    At a glance it appears your observations are correct - supplying a subscription correlationKey is only available when using a topic dispatch target or when subscribing directly to an IQueue or IClientName instance.

    One workaround would be to enable topic-dispatch, but if you are not interested in using that feature, you can decorate global message handler and pass that in as the topic handler for all subscriptions:

    var sessionProperties = new SessionProperties()
    {
      // other...
      SubscribeBlocking = false,
      TopicDispatch = true
    };
    
    using var context = ContextFactory.Instance.CreateContext(contextProperties, null);
    using var session = context.CreateSession(sessionProperties, MessageEvent, SessionEvent);
    
    session.Connect();
    
    using var topic = ContextFactory.Instance.CreateTopic(topicString);
    
    // This could be wrapped up into a helper or extension method, .SubscribeAsync(topic)
    var tsc = new TaskCompletionSource<ReturnCode>();
    var target = session.CreateDispatchTarget(topic, (s,e) => MessageEvent(session, e));
    session.Subscribe(target, SubscribeFlag.RequestConfirm, tsc);
    
    // Now it is awaitable
    var result = await tsc.Task;
    

    Meanwhile elsewhere:

    private void SessionEvent(object sender, MessageEventArgs e)
    {
      var tsc = e.CorrelationKey as TaskCompletionSource<ReturnCode>;
    
      switch(e.Event)
      {
        case SessionEvent.SubscriptionOk:
          tsc?.SetResult(ReturnCode.SOLCLIENT_OK);
          break;
        case SessionEvent.SubscriptionError:
          tsc?.SetResult(ReturnCode.SOLCLIENT_FAIL); // Or .SetException(...)
          break;
      }
    }
    

Answers

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 42 Solace Employee
    #3 Answer ✓

    At a glance it appears your observations are correct - supplying a subscription correlationKey is only available when using a topic dispatch target or when subscribing directly to an IQueue or IClientName instance.

    One workaround would be to enable topic-dispatch, but if you are not interested in using that feature, you can decorate global message handler and pass that in as the topic handler for all subscriptions:

    var sessionProperties = new SessionProperties()
    {
      // other...
      SubscribeBlocking = false,
      TopicDispatch = true
    };
    
    using var context = ContextFactory.Instance.CreateContext(contextProperties, null);
    using var session = context.CreateSession(sessionProperties, MessageEvent, SessionEvent);
    
    session.Connect();
    
    using var topic = ContextFactory.Instance.CreateTopic(topicString);
    
    // This could be wrapped up into a helper or extension method, .SubscribeAsync(topic)
    var tsc = new TaskCompletionSource<ReturnCode>();
    var target = session.CreateDispatchTarget(topic, (s,e) => MessageEvent(session, e));
    session.Subscribe(target, SubscribeFlag.RequestConfirm, tsc);
    
    // Now it is awaitable
    var result = await tsc.Task;
    

    Meanwhile elsewhere:

    private void SessionEvent(object sender, MessageEventArgs e)
    {
      var tsc = e.CorrelationKey as TaskCompletionSource<ReturnCode>;
    
      switch(e.Event)
      {
        case SessionEvent.SubscriptionOk:
          tsc?.SetResult(ReturnCode.SOLCLIENT_OK);
          break;
        case SessionEvent.SubscriptionError:
          tsc?.SetResult(ReturnCode.SOLCLIENT_FAIL); // Or .SetException(...)
          break;
      }
    }