Publish Confirmation when using batch publish of Spring Cloud Stream binder for Solace PubSub+
I was posed with the question of whether it would be possible to get publish confirmation with a Batch Publish operation when using Spring Cloud Stream binder for Solace PubSub+.
The answer is Yes! Just that you would have to check on the correlation data in a separate thread. Here is a sample function, the last published message would fail because of ACL issue - the confirmation block in the thread captures that correctly.
@Bean public Function<String, Collection<Message<String>>> batchPublish() { return v -> { System.out.println("Received trigger to publish Batch of Messages"); ArrayList<CorrelationData> corr = new ArrayList<CorrelationData>(); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); corr.add(new CorrelationData()); ArrayList<Message<String>> msgList = new ArrayList<Message<String>>(); for (int i=0; i<corr.size()-1; i++) { msgList.add(MessageBuilder.withPayload("Payload " + i) .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(i)) .build()); } msgList.add(MessageBuilder.withPayload("Payload " + (corr.size()-1)) .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(corr.size()-1)) .setHeader(BinderHeaders.TARGET_DESTINATION, "publish/topic/fail") .build()); new Thread() { public void run() { for (int i=0; i<corr.size(); i++) { try { corr.get(i).getFuture().get(10, TimeUnit.SECONDS); System.out.println("Publish Successful for correlation: " + i); } catch (InterruptedException | ExecutionException | TimeoutException e) { System.out.println("Publish Failed - for correlation: " + i); e.printStackTrace(); } } } }.start(); System.out.println("Publish Batch of 6 Messages"); return msgList; }; }
Just thought of sharing this. If there are other alternatives, would love to hear.
Comments
-
Hi Giri,
I am looking for similar functionality. Had posted a question here https://stackoverflow.com/questions/75917883/post-processing-after-spring-cloud-stream-function
Can you please share working sample?
What is CorrelationData class and its contents?
Thanks
Tilak
0 -
Hi @Tilak, If I understand your requirement correctly - you want to update the database after a successful publish operation (in the processor). You can try the same logic that I laid out at the start of the discussion.
a) Return a message set with correlation in the processor
b) Before returning, spawn a thread to check on correlation success; if yes, issue your database update
Note that a return from the processor simply hands the control back to the spring cloud stream framework, and you can't get it back. A CorrelationData class employs a SettableListenableFuture that can be checked on for success/failure later, just instantiate it and set it on the message's header - you should be able to check on the correlation for the status of the operation.
CorrelationData
code is here.0