Solace Community is getting a facelift!
On March 3rd we will be starting the process of migrating Solace Community to a new platform. As a result, Solace Community will go in to a temporary read-only state. You will still be able to come onto Solace Community and search through posts to find answers, but you won't be able to ask questions, post comments, or react in any way.
We hope to have the migration complete by Wednesday March 5th (or sooner), so please keep an eye out!
Publish Confirmation when using batch publish of Spring Cloud Stream binder for Solace PubSub+

I was posed with the question of whether it would be possible to get publish confirmation with a Batch Publish operation when using Spring Cloud Stream binder for Solace PubSub+.
The answer is Yes! Just that you would have to check on the correlation data in a separate thread. Here is a sample function, the last published message would fail because of ACL issue - the confirmation block in the thread captures that correctly.
@Bean public Function<String, Collection<Message<String>>> batchPublish() { return v -> { System.out.println("Received trigger to publish Batch of Messages"); ArrayList<CorrelationData> corr = new ArrayList<CorrelationData>(); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); for (int i=0; i<corr.size()-1; i++) { msgList.add(MessageBuilder.withPayload("Payload " + i) .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(i)) .build()); } msgList.add(MessageBuilder.withPayload("Payload " + (corr.size()-1)) .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(corr.size()-1)) .setHeader(BinderHeaders.TARGET_DESTINATION, "publish/topic/fail") .build()); new Thread() { public void run() { for (int i=0; i<corr.size(); i++) { try { corr.get(i).getFuture().get(10, TimeUnit.SECONDS); System.out.println("Publish Successful for correlation: " + i); } catch (InterruptedException | ExecutionException | TimeoutException e) { System.out.println("Publish Failed - for correlation: " + i); e.printStackTrace(); } } } }.start(); System.out.println("Publish Batch of 6 Messages"); return msgList; }; }
Just thought of sharing this. If there are other alternatives, would love to hear.
Comments
-
Hi Giri,
I am looking for similar functionality. Had posted a question here https://stackoverflow.com/questions/75917883/post-processing-after-spring-cloud-stream-function
Can you please share working sample?
What is CorrelationData class and its contents?
Thanks
Tilak
0 -
Hi @Tilak, If I understand your requirement correctly - you want to update the database after a successful publish operation (in the processor). You can try the same logic that I laid out at the start of the discussion.
a) Return a message set with correlation in the processor
b) Before returning, spawn a thread to check on correlation success; if yes, issue your database update
Note that a return from the processor simply hands the control back to the spring cloud stream framework, and you can't get it back. A CorrelationData class employs a SettableListenableFuture that can be checked on for success/failure later, just instantiate it and set it on the message's header - you should be able to check on the correlation for the status of the operation.
CorrelationData
code is here.0