Spring cloud stream with solace

akg17
akg17 Member Posts: 76

Couple of question i have for solace binder,
1. I tried an example for DLQ functionality where i could throw exception from the listener and it was published to error queue, does solace has any mechanism to reprocess them, except that we add a listener and reprocess manually/code. Even same for DMQ.

Comments

  • akg17
    akg17 Member Posts: 76

    with spring cloud stream also i want to use something like a solaceTemplate to make a call and siphon the message from the queue what is the configuration and code changes for that any resource available ?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hi @akg17,
    You have two "out of the box" options for failed consumer message error handling with the Solace binder as defined here.
    Option 1: Message Redelivery basically unbinds/rebinds your app from the broker and will end up in a re-delivery of the message to your app. Depending on how you configure your max-redeliveries on your queue and if you have a Solace DMQ configure on it then the message could end up falling to the DMQ for handling.
    Option 2: Error Queue Republishing will send messages that throw an exception to an error queue. This error queue is just another regular queue on the broker that any app can listen on.

    Note that those are the "out of the box" options, but of course depending on your use case there are custom approaches you could take.

  • akg17
    akg17 Member Posts: 76

    @marc I had gone through this document, I know how errorqueue and DMQ is different and what is used for now the challenge is reprocessing.
    1. I can add a listener to my error queue and subscribe and reprocess the message but as per our project existing code they are making use of kafkaTemplate, RabbitMqTemplate to siphon message from the queue, even proving number of message
    and even for solace i did something long back using jms & jndi Template but i am not sure how to do the same thing with spring cloud stream. I need to consume message whenever we want not using a listener. approach could be to siphon a number of message from the queue , save to data base, and then from the UI view the message and also reprocess which could dropping to same queue back or some other way depending on the status of message may be.
    2. And same question for DMQ message reprocessing,
    I need your help about reproccessing of the message how to do the same thing with spring cloud stream and solace like we do using Jms and JndiTemplate. see the below code i used a year back for jms,

        InitialContext initialContext = null;
        SolSession session = null;
        List<DeadLetterMessage> deadLetterMessages = new ArrayList<>();
        try {
            session = (SolSession) solConnection.createSession(true, 0);
            session.start();
            initialContext = (InitialContext) jndiTemplate.getContext();
            Queue dlqQueue = (Queue) initialContext.lookup(dlq);
            MessageConsumer consumer = session.createConsumer(dlqQueue);
            for (int i = 0; i < numOfMessage; i++) {
                try {
                    TextMessage textMessage = (TextMessage) consumer.receive(1000);
                    DeadLetterMessage deadLetterMessage = objectMapper.readValue(textMessage.getText(), DeadLetterMessage.class);
                    deadLetterMessage.setRecoveryStatus(RecoveryStatus.SIPHONED);
                    deadLetterMessages.add(deadLetterMessage);
                } catch (Exception e) {
                    log.error("Failed Parsing Message " + e.getMessage());
                }
            }
    
  • UshShukla
    UshShukla Member, Employee Posts: 4 Solace Employee

    Hey @akg17
    Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
    I've posted a demo on the Solace Github that details how to go about doing this.

    The general approach is as follows:
    1. Define a pollable source (the queue you want to poll) in application.properties, along with the number of messages to get on each poll.
    2. Instantiate PollableMessageSource to get a reference to the queue in your code
    3. Invoke the poll method on PollableMessageSource as needed to get data.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Awesome, thanks @UshShukla for looking into this and even providing a working example 🥳

  • akg17
    akg17 Member Posts: 76

    @marc and @UshShukla Thanks, i got 2 option now either to use pollablesource as you mentioned or jmsTemplate
    i was facing issue parsing the bytes message coming from a MessageChannel and got to know that message.getBody of jsm 2.0 is not yet supported so i had to get bytes and then converted it to string.

  • nagendra
    nagendra Member Posts: 1

    @akg17 said:
    @marc I had gone through this document, I know how errorqueue and DMQ is different and what is used for now the challenge is reprocessing.
    1. I can add a listener to my error queue and subscribe and reprocess the message but as per our project existing code they are making use of kafkaTemplate, RabbitMqTemplate to siphon message from the queue, even proving number of message
    and even for solace i did something long back using jms & jndi Template but i am not sure how to do the same thing with spring cloud stream. I need to consume message whenever we want not using a listener. approach could be to siphon a number of message from the queue , save to data base, and then from the UI view the message and also reprocess which could dropping to same queue back or some other way depending on the status of message may be.
    2. And same question for DMQ message reprocessing,
    I need your help about reproccessing of the message how to do the same thing with spring cloud stream and solace like we do using Jms and JndiTemplate. see the below code i used a year back for jms,

        InitialContext initialContext = null;
        SolSession session = null;
        List<DeadLetterMessage> deadLetterMessages = new ArrayList<>();
        try {
            session = (SolSession) solConnection.createSession(true, 0);
            session.start();
            initialContext = (InitialContext) jndiTemplate.getContext();
            Queue dlqQueue = (Queue) initialContext.lookup(dlq);
            MessageConsumer consumer = session.createConsumer(dlqQueue);
            for (int i = 0; i < numOfMessage; i++) {
                try {
                    TextMessage textMessage = (TextMessage) consumer.receive(1000);
                    DeadLetterMessage deadLetterMessage = objectMapper.readValue(textMessage.getText(), DeadLetterMessage.class);
                    deadLetterMessage.setRecoveryStatus(RecoveryStatus.SIPHONED);
                    deadLetterMessages.add(deadLetterMessage);
                } catch (Exception e) {
                    log.error("Failed Parsing Message " + e.getMessage());
                }
            }
    

    @UshShukla said:
    Hey @akg17
    Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
    I've posted a demo on the Solace Github that details how to go about doing this.

    The general approach is as follows:
    1. Define a pollable source (the queue you want to poll) in application.properties, along with the number of messages to get on each poll.
    2. Instantiate PollableMessageSource to get a reference to the queue in your code
    3. Invoke the poll method on PollableMessageSource as needed to get data.

    @UshShukla said:
    Hey @akg17
    Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
    I've posted a demo on the Solace Github that details how to go about doing this.

    The general approach is as follows:
    1. Define a pollable source (the queue you want to poll) in application.properties, along with the number of messages to get on each poll.
    2. Instantiate PollableMessageSource to get a reference to the queue in your code
    3. Invoke the poll method on PollableMessageSource as needed to get data.

    Hi @UshShukla , Thanks for this demo code. This helped me in a great deal. I am working on a solution where: I need to queue up all the messages in error queue where one of the sync system is down (to achieve zero RPO). I have provisioned error queue size for 30 mins downtime of database. And my poller runs every 30 mins and needs to process all the messages in the error queue. I have modified the demo code as below:

    public void poll() {
            while (true) {
                Boolean result = this.source.poll(receive -> {
                    String msg = new String((byte[]) receive.getPayload());
                    System.out.println("Polling received message with Message " + msg);
                    // Do process and if some conditions arise, throw exception
                    Boolean condition = true;
                    if (condition) {
                        throw new RuntimeException("Permanent failure");
                    }
                });
    
                if (!result) {
                    System.out.println("No more messages to process");
                    break;
                }
            }
        }
    

    This works great if RuntimeException is not thrown or only one message in the queue. If there are more than one message in the queue. e.g. "Hello World 1", "Hello World 2", "Hello World 3", the above code prints "Hello World 1" twice and all the 3 messages disappear from the queue. Please note max-delivery is set to 1 and queue itself has redelivery as 1.
    Please note the following setting:

    spring.cloud.stream.bindings.poller-in-0.destination = <somevalue>
    spring.cloud.stream.bindings.poller-in-0.group = <somegroup>
    spring.cloud.stream.pollable-source = poller
    spring.cloud.stream.poller.max-messages-per-poll  = 1
    
    

    Would you have any suggestion? Thanks in advance.

  • sakshi_123
    sakshi_123 Member Posts: 9

    What is the difference destination and pollable source? shouldn't they be defined to the same solace queue to be used for polling?