Allowing a publisher to see the last message published while preventing subscription

allmhhuran
allmhhuran Member Posts: 47 ✭✭✭
edited November 2024 in General Discussions #1

I'm a bit stuck trying to figure out the best way to combine some features I'd like to provide in our in-house broker abstraction library.

I have an IPublication<T> interface, which exposes a "simple" API to consumers,
Task PublishAsync(T message, CancellationToken ct)and event Action<T>? MessagePublished. So far so good.

I'd like to add an additional method, Task<T> LastMessagePublished().

A last value queue seems like a good approach.

I used this approach when playing with transactional publication, wherein I published a batch of messages to a topic, as well as pushing to the LVQ, in the same transaction (sample implementation on github ). This is functional, but performance is limited by the transactional approach, so I don't want to use this.

The other option would be to simply subscribe to the publication topic. However, this conflicts with another feature of the implementation - I don't want to allow publishers to subscribe to their own topics. I enforce this using the publisher's ACL on the broker. This way I can ensure that no publisher can accidentally put themselves into an infinite loop.

So I find myself a bit stuck here. Any thoughts on how to provide a "last message published" feature to my IPublication without having to ditch my ACL enforcement?

Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 664 admin
    edited November 2024 #2

    Hi @allmhhuran, interesting thoughts and approaches. Thanks for posting, seems like the perfect kind of topic for a Community discussion.

    LVQs are great for keeping just the last message of something… it was primarily developed as a feature to allow reconnecting publishers that have lost all state (e.g. crashed) to figure out what was the last message that it sent that the broker successfully received. It was envisioned that this would be done by using topics, having a single level of the topic hierarchy dedicated to the publisher ID, and then having an LVQ per publisher (per publisher thread?) listening to that guy's topics. Something like: */*/pubId234/> so no matter what kind of message (wildcards everywhere) it will always keep the last message sent by this publisher (ID #234).

    I've never thought of using the LVQ as part of a transaction… I guess that could work just fine, but would not be as performant.

    But your concern around ACLs… I don't think it matters here? I can have an app with a subscription ACL of */*/pubId234/> and I can still browse an LVQ that has that specific subscription. Have you tried this and ran into issues?

  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited November 2024 #3

    Hey @Aaron ,

    Regarding the "purpose" of the LVQ in the Solace design - yep, that's precisely my intent. Provide a user of my library the ability to see the last message published in case they ungracefully terminate for some reason.

    The difficulty I am facing probably comes down to how we are organising our topics, but the current organisation "makes sense" for our current use case design.

    We're doing Event Carried State Transfer via pub-sub using a Common Data Model.

    Each topic represents events related to one CDM entity, and an event is published whenever data about some entity instance changes.

    There's exactly one system of record for any CDM entity.

    Zero or more satellite systems may store a local copy of that entity data (using an eventual consistency model, thus taking read load off the system of record, ensuring availability of satellites even if the system of record is unavailable, and allowing satellite systems to structure the data according to their own domain models).

    For example, suppose the ERP is the system of record for "Customers". There is therefore a topic called "ECST/Customer".

    The system of record is the only system with the ACL permission to publish to that topic, and all messages on that topic are in the shape of the Customer common data model (the message content is just a serialised C# DTO, with all CDM DTO's defined in a nuget package).

    The system of record is also given an ACL restriction which prevents it from subscribing to ECST/Customer. The reason for this is that if the system of record subscribed to its own publication, the following scenario could happen:

    1. Customer changed → system of record publishes an event related to that change.
    2. System of record then receives that event.
    3. System of record updates its own copy of the entity.
    4. Goto 1.

    There are programmatic ways to avoid this in any single system of record's implementation, but it's a lot neater to just prevent it from ever happening by controlling it at the ACL level.



  • allmhhuran
    allmhhuran Member Posts: 47 ✭✭✭
    edited November 2024 #4

    … but thinking about your idea using wildcards, there might be a solution.

    In my implementation, neither publishers nor subscribers know the actual topic names. They don't even know they're interacting with Solace. Clients Just interact with an IPublication<T>, where <T> is some CDM type, and the actual topic name is derived from T by my own (Solace specific) Publication<T> : IPublication<T> library internals. Similarly, I have a nuget package for a (Solace specific) Subscription<T> : ISubscription<T> for subscribers to use.


    So there might be some way I can be "tricky" with wildcards in the implementation, since I get to decide which topics publishers and subscribers are actually referring to when interacting with a T.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 664 admin

    Hey @allmhhuran, Happy New Year! I just stumbled back onto this post and thought I'd mention something I recently figured out: you can use Topic Endpoints as Last Value endpoints as well… not just LVQs..!

    Reason why this is interesting: you can use both topics and Selectors as part of the LVTE config! Selectors behave differently in Queues vs. Topic Endpoints… in the former, they're performed at egress (after the queue, on the way to the client), which means non-matching messages stay in the queue; whereas with Topic Endpoint, the selector filtering is performed at ingress. So only messages matching both subscription and selector make it into the TE.

    I hope you're aware of some of the performance limitations of selectors in Solace (they're significantly slower than filtering by topics), so we always recommend people try to use topic filtering if possible (e.g. add the author name as a topic level, and filter on that using a topic subscription, rather than using a selector on the user property "author", for example).

    BUT if your rates are low, and/or you cannot have your filtering done solely by topic (as a queue-based LVQ requires), then maybe this is an option… especially if your messages already have the headers or properties populated that you want to filter on. https://docs.solace.com/API/API-Developer-Guide/Using-Selectors.htm

    Just a thought for you, in case it helps. Obviously this wouldn't scale very well to 100s or 1000s of endpoints keeping track of the last message on a whole bunch of different topics/selectors: a last-value cache (like PubSub+ cache) is better suited to that.

    That's all. Hope you're well!