Publish Confirmation when using batch publish of Spring Cloud Stream binder for Solace PubSub+

I was posed with the question of whether it would be possible to get publish confirmation with a Batch Publish operation when using Spring Cloud Stream binder for Solace PubSub+.
The answer is Yes! Just that you would have to check on the correlation data in a separate thread. Here is a sample function, the last published message would fail because of ACL issue - the confirmation block in the thread captures that correctly.

@Bean
public Function<String, Collection<Message>> batchPublish() {
return v → {
System.out.println(“Received trigger to publish Batch of Messages”);
ArrayList corr = new ArrayList();
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
ArrayList<Message> msgList = new ArrayList<Message>();
for (int i=0; i<corr.size()-1; i++) {
msgList.add(MessageBuilder.withPayload("Payload " + i)
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(i))
.build());
}
msgList.add(MessageBuilder.withPayload("Payload " + (corr.size()-1))
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(corr.size()-1))
.setHeader(BinderHeaders.TARGET_DESTINATION, “publish/topic/fail”)
.build());
new Thread() {
public void run() {
for (int i=0; i<corr.size(); i++) {
try {
corr.get(i).getFuture().get(10, TimeUnit.SECONDS);
System.out.println("Publish Successful for correlation: " + i);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println("Publish Failed - for correlation: " + i);
e.printStackTrace();
}
}
}
}.start();
System.out.println(“Publish Batch of 6 Messages”);
return msgList;
};
}
Just thought of sharing this. If there are other alternatives, would love to hear.

Hi Giri,
I am looking for similar functionality. Had posted a question here solace - Post processing after Spring cloud stream Function - Stack Overflow
Can you please share working sample?
What is CorrelationData class and its contents?

Thanks
Tilak

Hi @Tilak , If I understand your requirement correctly - you want to update the database after a successful publish operation (in the processor). You can try the same logic that I laid out at the start of the discussion.
a) Return a message set with correlation in the processor
b) Before returning, spawn a thread to check on correlation success; if yes, issue your database update
Note that a return from the processor simply hands the control back to the spring cloud stream framework, and you can’t get it back. A CorrelationData class employs a SettableListenableFuture that can be checked on for success/failure later, just instantiate it and set it on the message’s header - you should be able to check on the correlation for the status of the operation.
CorrelationData code is here .