Solace SpringCloudStream PubSub model using durable topic endpoint

Ravi
Ravi Member Posts: 12

We have a requirement to stop listening to queue when any error occurred while processing.

We are on Kubernates and we have many instances of an application which listening to the same queue, so if i try to stop listening to queue using the service name it stops only one instance others still listening to the queue.

As in kubernates it uses LB and send the request to any one of the instance of an application.

To overcome this issue and achieve this requirement we want to use the Solace PubSub model, so that if any error in processing the messages, we want to publish one message to a durable topic endpoint, so that all the instances subscribed to this durable topic will get the message and as per the message content we can stop/pause or start/resume listening to the queue.

We are using Solace with SpringCloudSteam, i need some samples how to publish message to durable topic endpoint and read message from durable endpoint.


Note: when i am trying to publish message to durable topic endpoint which is already created, message is not going to this topic, but is creating new durable queue and sending message to that queue.

@marc

Comments

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee

    @Ravi , are you sure you need a durable topic endpoint?

    A DTE is very similar to a queue, so if applications were listening to your signal to stop but were down, they could come back up, see the stop signal on the DTE and stop again.

    It sounds like you need to use a topic.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Ravi,

    As @TomF mentioned, I don't think you actually need a durable topic endpoint. For almost all use cases we recommend the use of queues. (check out this queue vs. topic endpoints blog)

    I think what you would want to do is publish a persistent message to a topic (topic is just metadata on the message) and then have your apps consume from a queue that is subscribed to that topic. This way you publish once and have all of your consuming apps receive it in a guaranteed manner.

    ***Note that this fits perfectly with the Solace Spring Cloud Stream Binder's defaults as it publishes to topics and consumes from queues.

  • Ravi
    Ravi Member Posts: 12
    edited September 2022 #4

    @TomF @marc : May be i am not clear

    We receive around 3 million messages to a queue daily round the clock and we want to consume asap message in queue, Single application having multiple replicas(eg. 5 instance/replicas) as below Image

    Due to some unforeseen issue, messages are not processed successfully i.e, error in processing, if so we want to stop listening to the queue, investigate the issue and fix it , then start listening back to the queue by all the 5 instances and then continue processing the messages which are in queue.

    In K8s we'll call the application url with service name, so K8S ingress LB will send the request to only one of the instance in 1-5 instances because of this when i call api to stop listening to the queue, only one instance will be stopped, even if i request multiple times to stop listening to the queue there is no guaranty all the 5 instances will stop listening to the queue due to LB in k8s ingress.

    So to achieve this i want to go with PubSub model i.e, use topic endpoint to handle all the error alert calls.

    i.e, when ever i get any error in processing the messages from queue, i'll put one message to a topic endpoint to stop listen to the queue and this end point will be subscribed and listened by all my 5 instances, so in all 5 instances after listening the message from topic and which has given msg to stop listening to queue, will handle it accordingly.

    So to achieve this I want some sample code how i can publish message to durable topic endpoint and all my 5 instance register/subscribe to this topic endpoint and then these 5 instances will read the message from topic endpoint and call api to stop listening to the message processing queue.

    It will be very help full if i get some sample code to publish messages to durable topic endpoint and consume the message in this topic by all my instance of the application.


    if my approach is wrong, plz guide how i can achieve my requirement

  • TomF
    TomF Member, Employee Posts: 406 Solace Employee

    @Ravi,

    I'm even more convined you don't want a durable topic endpoint.

    Use a topic instead. All the up applications will subscribe to the topic, and will receive the notification to stop processing. The beauty of this is that it doesn't matter how many applications are up, all those that are up will receive the message.

    A durable topic endpoint is a JMS artefact that allows you to subscribe to a topic and persist the messages. You don't need persistence and you don't need JMS. What you need is a subscription in the applications that listens for the stop consuming message.

    @marc, what's the best spring sample for a topic subscriber?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited September 2022 #6

    Hi @Ravi

    To follow on to @TomF's response I think there might be a confusion in terminology...and that's because it's confusing. Solace has the concept of "topics" and "topic endpoints". What you want to use here is a "topic" to do fanout to multiple consumers. A "topic endpoint" only allows for 1 consumer.

    When using the solace Spring Cloud Stream binder you publish to a topic and consume from Queue endpoints. (Note - no "Topic Endpoints"). So it would look something like this:



    From your app that gets an error you would publish your error using StreamBridge which sends to a topic using Persistent messaging:

    @Autowired
    private StreamBridge streamBridge;
    
    public void sendStopControlMsg(Exception ex) {
       CustomControlMessage ctrlMsg = CustomControlMessage("STOP", ex);  
       streamBridge.send("command/topic/stop", ctrlMsg); 
    }
    


    And in your apps that want to consume that you would just create a Consumer to receive and act on it + configure the binding to listen on that topic. Under the covers the Solace binder will create a queue that subscribes to that topic.

    @Bean
    public Consumer<CustomControlMessage> handleControlMsg(){
      return controlMsg -> {
        System.out.println("Received CustomControlMessage " + controlMsg );
        //Act on the control action. 
        //You may want this same function to handle start and stop (or pause and resume) ;) 
      };
    }
    

    In the config tell it the topics you want to subscribe to, using wildcards if you desire.

    spring.cloud.stream.function.bindings.handleControlMsg-in-0=command/topic/> 
    


    Hope that helps!

  • Ravi
    Ravi Member Posts: 12

    Thanks @TomF & @marc for the response.

    @marc In the consumer application as per above, i can have binding on topic i.e, "spring.cloud.stream.function.bindings.handleControlMsg-in-0=command/topic/>" instead of the Solace binder which creates a queue that subscribes to this topic? If so then i am good with the solution. We'll try to implement accordingly.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Ravi,

    I'm confused by your question, are you asking if a non-Spring Cloud Stream app can consume messages messages that the Spring Cloud Stream app publishes to command/topic/stop? If so, then yes you can receive the messages on a topic via apps using other languages/protocols. Those apps will just need to know how to parse the payload.

  • Ravi
    Ravi Member Posts: 12

    Thanks @Marc, we'll try to implement accordingly