🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
Spring cloud stream with solace
Couple of question i have for solace binder,
1. I tried an example for DLQ functionality where i could throw exception from the listener and it was published to error queue, does solace has any mechanism to reprocess them, except that we add a listener and reprocess manually/code. Even same for DMQ.
Comments
-
Hi @akg17,
You have two "out of the box" options for failed consumer message error handling with the Solace binder as defined here.
Option 1: Message Redelivery basically unbinds/rebinds your app from the broker and will end up in a re-delivery of the message to your app. Depending on how you configure your max-redeliveries on your queue and if you have a Solace DMQ configure on it then the message could end up falling to the DMQ for handling.
Option 2: Error Queue Republishing will send messages that throw an exception to an error queue. This error queue is just another regular queue on the broker that any app can listen on.Note that those are the "out of the box" options, but of course depending on your use case there are custom approaches you could take.
1 -
@marc I had gone through this document, I know how errorqueue and DMQ is different and what is used for now the challenge is reprocessing.
1. I can add a listener to my error queue and subscribe and reprocess the message but as per our project existing code they are making use of kafkaTemplate, RabbitMqTemplate to siphon message from the queue, even proving number of message
and even for solace i did something long back using jms & jndi Template but i am not sure how to do the same thing with spring cloud stream. I need to consume message whenever we want not using a listener. approach could be to siphon a number of message from the queue , save to data base, and then from the UI view the message and also reprocess which could dropping to same queue back or some other way depending on the status of message may be.
2. And same question for DMQ message reprocessing,
I need your help about reproccessing of the message how to do the same thing with spring cloud stream and solace like we do using Jms and JndiTemplate. see the below code i used a year back for jms,InitialContext initialContext = null; SolSession session = null; List<DeadLetterMessage> deadLetterMessages = new ArrayList<>(); try { session = (SolSession) solConnection.createSession(true, 0); session.start(); initialContext = (InitialContext) jndiTemplate.getContext(); Queue dlqQueue = (Queue) initialContext.lookup(dlq); MessageConsumer consumer = session.createConsumer(dlqQueue); for (int i = 0; i < numOfMessage; i++) { try { TextMessage textMessage = (TextMessage) consumer.receive(1000); DeadLetterMessage deadLetterMessage = objectMapper.readValue(textMessage.getText(), DeadLetterMessage.class); deadLetterMessage.setRecoveryStatus(RecoveryStatus.SIPHONED); deadLetterMessages.add(deadLetterMessage); } catch (Exception e) { log.error("Failed Parsing Message " + e.getMessage()); } }
0 -
Hey @akg17
Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
I've posted a demo on the Solace Github that details how to go about doing this.The general approach is as follows:
1. Define a pollable source (the queue you want to poll) inapplication.properties
, along with the number of messages to get on each poll.
2. InstantiatePollableMessageSource
to get a reference to the queue in your code
3. Invoke thepoll
method onPollableMessageSource
as needed to get data.3 -
Awesome, thanks @UshShukla for looking into this and even providing a working example 🥳
2 -
@marc and @UshShukla Thanks, i got 2 option now either to use pollablesource as you mentioned or jmsTemplate
i was facing issue parsing the bytes message coming from a MessageChannel and got to know that message.getBody of jsm 2.0 is not yet supported so i had to get bytes and then converted it to string.1 -
@akg17 said:
@marc I had gone through this document, I know how errorqueue and DMQ is different and what is used for now the challenge is reprocessing.
1. I can add a listener to my error queue and subscribe and reprocess the message but as per our project existing code they are making use of kafkaTemplate, RabbitMqTemplate to siphon message from the queue, even proving number of message
and even for solace i did something long back using jms & jndi Template but i am not sure how to do the same thing with spring cloud stream. I need to consume message whenever we want not using a listener. approach could be to siphon a number of message from the queue , save to data base, and then from the UI view the message and also reprocess which could dropping to same queue back or some other way depending on the status of message may be.
2. And same question for DMQ message reprocessing,
I need your help about reproccessing of the message how to do the same thing with spring cloud stream and solace like we do using Jms and JndiTemplate. see the below code i used a year back for jms,InitialContext initialContext = null; SolSession session = null; List<DeadLetterMessage> deadLetterMessages = new ArrayList<>(); try { session = (SolSession) solConnection.createSession(true, 0); session.start(); initialContext = (InitialContext) jndiTemplate.getContext(); Queue dlqQueue = (Queue) initialContext.lookup(dlq); MessageConsumer consumer = session.createConsumer(dlqQueue); for (int i = 0; i < numOfMessage; i++) { try { TextMessage textMessage = (TextMessage) consumer.receive(1000); DeadLetterMessage deadLetterMessage = objectMapper.readValue(textMessage.getText(), DeadLetterMessage.class); deadLetterMessage.setRecoveryStatus(RecoveryStatus.SIPHONED); deadLetterMessages.add(deadLetterMessage); } catch (Exception e) { log.error("Failed Parsing Message " + e.getMessage()); } }
@UshShukla said:
Hey @akg17
Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
I've posted a demo on the Solace Github that details how to go about doing this.The general approach is as follows:
1. Define a pollable source (the queue you want to poll) inapplication.properties
, along with the number of messages to get on each poll.
2. InstantiatePollableMessageSource
to get a reference to the queue in your code
3. Invoke thepoll
method onPollableMessageSource
as needed to get data.@UshShukla said:
Hey @akg17
Thanks for your patience while we worked this out. If I understood your question, you were asking how to poll a DLQ on-demand when using Spring Cloud Stream.
I've posted a demo on the Solace Github that details how to go about doing this.The general approach is as follows:
1. Define a pollable source (the queue you want to poll) inapplication.properties
, along with the number of messages to get on each poll.
2. InstantiatePollableMessageSource
to get a reference to the queue in your code
3. Invoke thepoll
method onPollableMessageSource
as needed to get data.Hi @UshShukla , Thanks for this demo code. This helped me in a great deal. I am working on a solution where: I need to queue up all the messages in error queue where one of the sync system is down (to achieve zero RPO). I have provisioned error queue size for 30 mins downtime of database. And my poller runs every 30 mins and needs to process all the messages in the error queue. I have modified the demo code as below:
public void poll() { while (true) { Boolean result = this.source.poll(receive -> { String msg = new String((byte[]) receive.getPayload()); System.out.println("Polling received message with Message " + msg); // Do process and if some conditions arise, throw exception Boolean condition = true; if (condition) { throw new RuntimeException("Permanent failure"); } }); if (!result) { System.out.println("No more messages to process"); break; } } }
This works great if RuntimeException is not thrown or only one message in the queue. If there are more than one message in the queue. e.g. "Hello World 1", "Hello World 2", "Hello World 3", the above code prints "Hello World 1" twice and all the 3 messages disappear from the queue. Please note max-delivery is set to 1 and queue itself has redelivery as 1.
Please note the following setting:spring.cloud.stream.bindings.poller-in-0.destination = <somevalue> spring.cloud.stream.bindings.poller-in-0.group = <somegroup> spring.cloud.stream.pollable-source = poller spring.cloud.stream.poller.max-messages-per-poll = 1
Would you have any suggestion? Thanks in advance.
0 -
What is the difference destination and pollable source? shouldn't they be defined to the same solace queue to be used for polling?
0