Allowing a publisher to see the last message published while preventing subscription
I'm a bit stuck trying to figure out the best way to combine some features I'd like to provide in our in-house broker abstraction library.
I have an IPublication<T>
interface, which exposes a "simple" API to consumers, Task PublishAsync(T message, CancellationToken ct)
and event Action<T>? MessagePublished
. So far so good.
I'd like to add an additional method, Task<T> LastMessagePublished()
.
A last value queue seems like a good approach.
I used this approach when playing with transactional publication, wherein I published a batch of messages to a topic, as well as pushing to the LVQ, in the same transaction (sample implementation on github ). This is functional, but performance is limited by the transactional approach, so I don't want to use this.
The other option would be to simply subscribe to the publication topic. However, this conflicts with another feature of the implementation - I don't want to allow publishers to subscribe to their own topics. I enforce this using the publisher's ACL on the broker. This way I can ensure that no publisher can accidentally put themselves into an infinite loop.
So I find myself a bit stuck here. Any thoughts on how to provide a "last message published" feature to my IPublication without having to ditch my ACL enforcement?
Answers
-
Hi @allmhhuran, interesting thoughts and approaches. Thanks for posting, seems like the perfect kind of topic for a Community discussion.
LVQs are great for keeping just the last message of something… it was primarily developed as a feature to allow reconnecting publishers that have lost all state (e.g. crashed) to figure out what was the last message that it sent that the broker successfully received. It was envisioned that this would be done by using topics, having a single level of the topic hierarchy dedicated to the publisher ID, and then having an LVQ per publisher (per publisher thread?) listening to that guy's topics. Something like:
*/*/pubId234/>
so no matter what kind of message (wildcards everywhere) it will always keep the last message sent by this publisher (ID #234).I've never thought of using the LVQ as part of a transaction… I guess that could work just fine, but would not be as performant.
But your concern around ACLs… I don't think it matters here? I can have an app with a subscription ACL of
*/*/pubId234/>
and I can still browse an LVQ that has that specific subscription. Have you tried this and ran into issues?0 -
Hey @Aaron ,
Regarding the "purpose" of the LVQ in the Solace design - yep, that's precisely my intent. Provide a user of my library the ability to see the last message published in case they ungracefully terminate for some reason.
The difficulty I am facing probably comes down to how we are organising our topics, but the current organisation "makes sense" for our current use case design.
We're doing Event Carried State Transfer via pub-sub using a Common Data Model.
Each topic represents events related to one CDM entity, and an event is published whenever data about some entity instance changes.
There's exactly one system of record for any CDM entity.
Zero or more satellite systems may store a local copy of that entity data (using an eventual consistency model, thus taking read load off the system of record, ensuring availability of satellites even if the system of record is unavailable, and allowing satellite systems to structure the data according to their own domain models).For example, suppose the ERP is the system of record for "Customers". There is therefore a topic called "ECST/Customer".
The system of record is the only system with the ACL permission to publish to that topic, and all messages on that topic are in the shape of the Customer common data model (the message content is just a serialised C# DTO, with all CDM DTO's defined in a nuget package).
The system of record is also given an ACL restriction which prevents it from subscribing to ECST/Customer. The reason for this is that if the system of record subscribed to its own publication, the following scenario could happen:
1. Customer changed → system of record publishes an event related to that change.
2. System of record then receives that event.
3. System of record updates its own copy of the entity.
4. Goto 1.
There are programmatic ways to avoid this in any single system of record's implementation, but it's a lot neater to just prevent it from ever happening by controlling it at the ACL level.0 -
… but thinking about your idea using wildcards, there might be a solution.
In my implementation, neither publishers nor subscribers know the actual topic names. They don't even know they're interacting with Solace. Clients Just interact with anIPublication<T>
, where<T>
is some CDM type, and the actual topic name is derived fromT
by my own (Solace specific)Publication<T> : IPublication<T>
library internals. Similarly, I have a nuget package for a (Solace specific)Subscription<T> : ISubscription<T>
for subscribers to use.
So there might be some way I can be "tricky" with wildcards in the implementation, since I get to decide which topics publishers and subscribers are actually referring to when interacting with aT
.0