I was posed with the question of whether it would be possible to get publish confirmation with a Batch Publish operation when using Spring Cloud Stream binder for Solace PubSub+.
The answer is Yes! Just that you would have to check on the correlation data in a separate thread. Here is a sample function, the last published message would fail because of ACL issue - the confirmation block in the thread captures that correctly.
@Bean
public Function<String, Collection<Message>> batchPublish() {
return v → {
System.out.println(“Received trigger to publish Batch of Messages”);
ArrayList corr = new ArrayList();
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
corr.add(new CorrelationData());
ArrayList<Message> msgList = new ArrayList<Message>();
for (int i=0; i<corr.size()-1; i++) {
msgList.add(MessageBuilder.withPayload("Payload " + i)
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(i))
.build());
}
msgList.add(MessageBuilder.withPayload("Payload " + (corr.size()-1))
.setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(corr.size()-1))
.setHeader(BinderHeaders.TARGET_DESTINATION, “publish/topic/fail”)
.build());
new Thread() {
public void run() {
for (int i=0; i<corr.size(); i++) {
try {
corr.get(i).getFuture().get(10, TimeUnit.SECONDS);
System.out.println("Publish Successful for correlation: " + i);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
System.out.println("Publish Failed - for correlation: " + i);
e.printStackTrace();
}
}
}
}.start();
System.out.println(“Publish Batch of 6 Messages”);
return msgList;
};
}
Just thought of sharing this. If there are other alternatives, would love to hear.