StreamBridge creating consumer and it's subscription

VarunDamani
VarunDamani Member Posts: 14
edited September 2021 in General Discussions #1

I have created a Spring cloud stream application that publishes messages to Solace. The publish message is triggered by an external event. I am using StreamBridge to achieve this functionality of publishing messages.

My application.properties look like this

Problem: when the application starts it creates a consumer queue and a subscription to it.

Question: Since my application's job is to only publish messages why does it create a consumer queue. Is there anything that I've missed setting in the properties?

Comments

  • giri
    giri Member, Administrator, Employee Posts: 103 admin

    Hi Varun,

    My quick take on this is - the queue is created not because you have defined binding (someEvent-out-0), maybe because you have a Consumer function implemented in your application. If you comment/remove that, I believe the queue won't be created.

    Can you check on this, please?

    --Giri

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @giri , there isn't any consumer function that I have defined explicitly. I've just autowired StreamBridge and calling the send method to publish events like the following:

    streamBridge.send("someEvent-out-0", event)

  • arih
    arih Member, Employee Posts: 125 Solace Employee

    Hi @VarunDamani

    I've tried with a simple SCS app, and like Giri said, this shouldn't have created the queue. Especially since this is "out" binding. My code also uses streamBridge.send( the topic address , the POJO/payload ) , but I'm not sure why you'd put the binding name there.

    But, we do know that Queue creation is logged in the event.log

    example: 2021-09-06T23:22:09.807+00:00 <local3.notice> 589c4eb18123 event: CLIENT: CLIENT_CLIENT_CREATE_ENDPOINT: default Aris-MBP/81736/#000f0001/-XBFz0ctVv Client (15) Aris-MBP/81736/#000f0001/-XBFz0ctVv username default Create Durable Queue scst/wk/cgp2/plain/patient/data/raw/_/bp: From (), AccessType (Non-Exclusive), Quota (5000MB), MaxMessageSize (10000000B), AllOthersPermission(Read|Consume), RespectTTL(No), RejectMsgToSenderOnDiscard(Yes), MaxRedelivery(0)

    If you have access to the logs / cli you can check the log entry when the durable queue got created, and find that client ID on either CLI or web manager - just to confirm if your app did that. And if so, we should investigate more on the complete code I think.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @arih ,

    Here is the application startup log

    Here you can see that there is a temporary queue being created and it's binding to a consumer.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @VarunDamani,

    Is the orderPlacedEvent function a Functional Interface? For example, does it return a java.util.function.Function or java.util.function.Consumer? If so then cloud stream might be automatically picking it up and configuring it with default bindings. Note that I don't believe this should happen if you are defining spring.cloud.function.definition and the function is not called out in that config.....any chance you can share the actual code/config? Obviously feel free to take out the business logic.

  • VarunDamani
    VarunDamani Member Posts: 14
    edited September 2021 #7

    Hi @marc
    Here is the piece of code that is handling the external event. And on any event from an external source, it triggers streamBridge.source(binding, event)

    The code is in Kotlin. There isn't any other configuration that I am setting anywhere else.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hey @VarunDamani,

    Interesting...so it looks like the EventHandler interface has the @StreamListener annotation on it. This annotation was used in Spring Cloud Stream v2 to bind an input channel....I'm assuming that Spring Cloud Stream is picking this up and saying "hey, you want this bound to a messaging channel". If you don't want this function to be triggered via a message can you just remove the @EventHandler annotation?

    @Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @StreamListener
    @interface EventHandler {
    

    Hope that does the trick. Let me know.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , the @EventHandler annotation is not from Spring instead it is from a third-party tool that I am using. I cannot remove the annotation.
    This annotation is the one that catches external events.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    @VarunDamani,

    Interesting, can you share what the 3rd party tool is? Where is it connecting to receive the events? That would help figure out what the proper path forward would be.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , I am using Axon framework. It's an event store. The @EventHandler annotation is part of this framework's api.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited September 2021 #12

    Thanks. I think that might be a different class than what I was thinking so 2 questions:
    1. is this the eventhandler class? https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/EventHandler.java
    2. Where is the app actually receiving events from? (sorry - not familiar with that framework)

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , yes that is the class and Axon is an event-store. So, it has events that we subscribe to.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @VarunDamani,

    Okay, so to me it looks like your issue is very similar to this: https://stackoverflow.com/questions/59181853/is-it-possible-to-disable-spring-cloud-streams-functional-binding-for-a-specifi

    Can you try the suggestions that Oleg made in that post and see if they work for you? I'm also following up with Oleg directly to see if there is a newer suggested approach since 2019.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @VarunDamani,

    I heard back from Oleg and it looks like they made an enhancement for this exact scenario. You can check it out in docs here, but the summary is to set spring.cloud.stream.function.autodetect property to false.

    Hope that works for you!

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , I tried setting this property along with the configurations pasted earlier but the Queue is still being created with a subscription.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited September 2021 #17

    Hi @VarunDamani,

    I'm wondering if it could be that you need a newer version of Spring Cloud Stream. Can you check the FunctionConfiguration class on your classpath and see if it has the logic to use autoDetect in it?

    This is the method which seems to use it:
    https://github.com/spring-cloud/spring-cloud-stream/blob/main/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L892:L910

    Note that I had spring-cloud.version 2020.0.2 which did NOT pick it up, but changing to 2020.0.4 it seems to be correct. 2020.0.4 brought in spring-cloud-stream-3.1.4 for me.

  • soniya
    soniya Member Posts: 23
    edited June 2022 #18

    Hi.. @marc


    I am using StreamBridge .send method to bind the data but did not get any message data in below out put...

    @Bean public Consumer<DataCode> event1(StreamBridge streamBridge) { return ds
    	 -> { DataCode dataCode = new DataCode();
    	 dataCode.setEventType(ds.getEventType().toUpperCase());
    	 dataCode.setNetwork(ds.getNetwork().toUpperCase());
    	  
    	 String topic = TOPIC_PREFIX + ds.getEventType() + "/" + ds.getNetwork();
    	 //System.out.println("Event E1 for order creation for Network1-----------------------" +topic +ds.getEventType() +ds.getNetwork()); 
    	 streamBridge.send(topic,dataCode);
    	  
    	 }; }
    

    output:----I am getting <message handler ID: 18bab7d0-e7d1-4859-b543-253f00bb5cf2> instead of event and network value. please suggest.{"eventType":"update2",

    "network": "network3"}

    com/Supply/Order/update2/network3 <message handler ID: 18bab7d0-e7d1-4859-b543-253f00bb5cf2>

  • giri
    giri Member, Administrator, Employee Posts: 103 admin

    Hi @soniya - Does the Datacode have default serialization methods? If not, it might look like this with just ID. Check it out and let us know.

  • soniya
    soniya Member Posts: 23

    Yes I have but still not working always I an getting  cid------>: null

    public class DataCode implements Serializable {

    private static final long serialVersionUID = 1L;

  • giri
    giri Member, Administrator, Employee Posts: 103 admin
    edited July 2022 #21

    Hi @soniya - Need more information. It could be due to a content-encoding issue.

    Please check your cloud.stream.bindings.event1 settings. If you have "useNativeEncoding" property set, turn it off. If not set, just set it as false - like "useConentEncoding: false" and try. Just wanted to check if this makes a difference in your setup.

    Example:

    spring:
     cloud:
      function:
       definition: consumer
      stream:
       bindings:
        consumer-in-0:
         destination: 'community/consume/topic'
         group: community
         consumer:
          useNativeDecoding: false