Publish Confirmation when using batch publish of Spring Cloud Stream binder for Solace PubSub+

giri
giri Member, Administrator, Employee Posts: 116 admin
edited September 2022 in General Discussions #1

I was posed with the question of whether it would be possible to get publish confirmation with a Batch Publish operation when using Spring Cloud Stream binder for Solace PubSub+.

The answer is Yes! Just that you would have to check on the correlation data in a separate thread. Here is a sample function, the last published message would fail because of ACL issue - the confirmation block in the thread captures that correctly.


@Bean

public Function<String, Collection<Message<String>>> batchPublish() {
   return v -> {
       System.out.println("Received trigger to publish Batch of Messages");
       ArrayList<CorrelationData> corr = new ArrayList<CorrelationData>();
       corr.add(new CorrelationData());
       corr.add(new CorrelationData());
       corr.add(new CorrelationData());
       corr.add(new CorrelationData());
       corr.add(new CorrelationData());
       corr.add(new CorrelationData());

       ArrayList<Message<String>> msgList = new ArrayList<Message<String>>();
       for (int i=0; i<corr.size()-1; i++) {
           msgList.add(MessageBuilder.withPayload("Payload " + i)
              .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(i))
              .build());
       }

       msgList.add(MessageBuilder.withPayload("Payload " + (corr.size()-1))
           .setHeader(SolaceBinderHeaders.CONFIRM_CORRELATION, corr.get(corr.size()-1))
           .setHeader(BinderHeaders.TARGET_DESTINATION, "publish/topic/fail")
           .build());

       new Thread() {
           public void run() {
              for (int i=0; i<corr.size(); i++) {
                  try {
                     corr.get(i).getFuture().get(10, TimeUnit.SECONDS);
                     System.out.println("Publish Successful for correlation: " + i);
                  } catch (InterruptedException | ExecutionException | TimeoutException e) {
                     System.out.println("Publish Failed - for correlation: " + i);
                     e.printStackTrace();
                  }
              }
           }
       }.start();

       System.out.println("Publish Batch of 6 Messages");
       return msgList;
   };
}

Just thought of sharing this. If there are other alternatives, would love to hear.

Comments

  • Tilak
    Tilak Member Posts: 6

    Hi Giri,

    I am looking for similar functionality. Had posted a question here https://stackoverflow.com/questions/75917883/post-processing-after-spring-cloud-stream-function

    Can you please share working sample?

    What is CorrelationData class and its contents?


    Thanks

    Tilak

  • giri
    giri Member, Administrator, Employee Posts: 116 admin

    Hi @Tilak, If I understand your requirement correctly - you want to update the database after a successful publish operation (in the processor). You can try the same logic that I laid out at the start of the discussion.

    a) Return a message set with correlation in the processor

    b) Before returning, spawn a thread to check on correlation success; if yes, issue your database update

    Note that a return from the processor simply hands the control back to the spring cloud stream framework, and you can't get it back. A CorrelationData class employs a SettableListenableFuture that can be checked on for success/failure later, just instantiate it and set it on the message's header - you should be able to check on the correlation for the status of the operation.

    CorrelationData code is here.