Spring Cloud Stream publishing messages

Mike13
Mike13 Member Posts: 29

I have heard that SCS should no longer be used together with flux (flux destroys that: retry handling, error handling, transmission confirmation).
With StreamBridge messages can be published. But in my opinion the StreamBridge is not quite consistent with the other declarative and functional handling of SCS.
Are there still alternatives with which I can still work declaratively and functionally for publishing messages?

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hi @Mike13,

    Hope you're doing well! There is actually another option to publish messages to dynamic topics. Instead of StreamBridge you can specify a special header using BinderHeaders.TARGET_DESTINATION. This tells the binder (if the binder supports it - the Solace binder does) to override the output destination and send to this topic instead.

    You can find more info here in section 6: https://codelabs.solace.dev/codelabs/spring-cloud-stream-beyond/index.html?index=../..index#5

    Hope that helps!

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc

    I am very well, thanks. How are you? Yes, we use this option (BinderHeaders.TARGET_DESTINATION) for several use cases (e.g. request/reply with a dynamic reply address). All the cases I know work with Function but I don't know any case for Supplier.
    In the link that you provided (thanks for that) the example is also with a function. The article says that this would work with Supplier too, but how :-)?

    Best regards
    Mike

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin
    edited September 2021 #4

    Hey @Mike13,

    Hmm..I'm just passing through with a few minutes so I haven't compiled this, but something like this should work:

    @Bean
    public Supplier<Message<String>> mySupplier() {
        return () -> {
            String payload = "whatever";
            String myTopic = useYourBusinessLogic();
            logger.info("Publishing to: " + myTopic);
            return MessageBuilder.withPayload(payload).setHeader(BinderHeaders.TARGET_DESTINATION, myTopic).build();
        };
    }
    
  • Mike13
    Mike13 Member Posts: 29

    Hey @marc

    Thanks for the food for thought :-)! I managed to reach my goal:

        @Bean
        public MyObjectPublisher myObject() {
            return new MyObjectPublisher();
        }
    
        public class MyObjectPublisher implements Supplier<MyObject> {
    
            private BlockingDeque<MyObject> myObjects = new LinkedBlockingDeque<>();
    
            @Override
            public MyObject get() {
                return myObjects.poll();
            }
    
            public void publish(MyObject personWithHeight) {
                myObjects.add(personWithHeight);
            }
        }
    
    
       spring.cloud.function.definition= myObject
       spring.cloud.stream.bindings.myObject-out-0.destination = xy
    
    

    For publishing I inject the MyObjectPublisher into mycode and call publish...
    The problem with the approach: It seems to take forever until a message is published.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 955 admin

    Hey @Mike13,

    Glad you got it working but this does seem a bit harder to follow than StreamBridge would be IMO. And also not ideal that it takes a while until the message is published. Can you maybe elaborate some more on "not quite consistent with the other declarative and functional handling" ?

  • Mike13
    Mike13 Member Posts: 29
    edited September 2021 #7

    Hey @marc
    I try to elaborate it, yes :-)..
    When you have a consumer or a function:

    • It is functional
    • You create a bean, so in the code you don't see anything like "myObject-in-0.destination", you don't even "know" that this bean is a Consumer from a Messaging system
    • Everything that smells like messaging is done via the configuration

    I hope I could explain it with this in a way that is somewhat understandable.
    It doesn't bother me much, but it doesn't feel consistent to me.

  • arpit_k
    arpit_k Member Posts: 2

    Hi Team,

    **Need some input on spring cloud streaming with solace for which we are streaming a queues and want to have a batch operation to control the no of messages ,batch size and sequence of events what will be the configuration if wee need to progress as i can see for kakfa itis max.poll.records, will below configuration as ok or do we need some else

    queueMaxMsgSize
    Maximum message size for the consumer group queue.

    queueQuota
    Message spool quota for the consumer group queue.

    in short want to consume the data in batch ,so if there increase in no if events messages in some time we can limit to say 100 messages in fixed delay of 1000 ms.

    Thanks in advance
    Arpit