StreamBridge creating consumer and it's subscription

VarunDamani
VarunDamani Member Posts: 14
edited September 6 in General Discussions #1

I have created a Spring cloud stream application that publishes messages to Solace. The publish message is triggered by an external event. I am using StreamBridge to achieve this functionality of publishing messages.

My application.properties look like this

Problem: when the application starts it creates a consumer queue and a subscription to it.

Question: Since my application's job is to only publish messages why does it create a consumer queue. Is there anything that I've missed setting in the properties?

Comments

  • giri
    giri Member, Administrator, Employee Posts: 34 admin

    Hi Varun,

    My quick take on this is - the queue is created not because you have defined binding (someEvent-out-0), maybe because you have a Consumer function implemented in your application. If you comment/remove that, I believe the queue won't be created.

    Can you check on this, please?

    --Giri

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @giri , there isn't any consumer function that I have defined explicitly. I've just autowired StreamBridge and calling the send method to publish events like the following:

    streamBridge.send("someEvent-out-0", event)

  • arih
    arih Member, Employee Posts: 96 Solace Employee

    Hi @VarunDamani

    I've tried with a simple SCS app, and like Giri said, this shouldn't have created the queue. Especially since this is "out" binding. My code also uses streamBridge.send( the topic address , the POJO/payload ) , but I'm not sure why you'd put the binding name there.

    But, we do know that Queue creation is logged in the event.log

    example: 2021-09-06T23:22:09.807+00:00 <local3.notice> 589c4eb18123 event: CLIENT: CLIENT_CLIENT_CREATE_ENDPOINT: default Aris-MBP/81736/#000f0001/-XBFz0ctVv Client (15) Aris-MBP/81736/#000f0001/-XBFz0ctVv username default Create Durable Queue scst/wk/cgp2/plain/patient/data/raw/_/bp: From (), AccessType (Non-Exclusive), Quota (5000MB), MaxMessageSize (10000000B), AllOthersPermission(Read|Consume), RespectTTL(No), RejectMsgToSenderOnDiscard(Yes), MaxRedelivery(0)

    If you have access to the logs / cli you can check the log entry when the durable queue got created, and find that client ID on either CLI or web manager - just to confirm if your app did that. And if so, we should investigate more on the complete code I think.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @arih ,

    Here is the application startup log

    Here you can see that there is a temporary queue being created and it's binding to a consumer.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin

    Hi @VarunDamani,

    Is the orderPlacedEvent function a Functional Interface? For example, does it return a java.util.function.Function or java.util.function.Consumer? If so then cloud stream might be automatically picking it up and configuring it with default bindings. Note that I don't believe this should happen if you are defining spring.cloud.function.definition and the function is not called out in that config.....any chance you can share the actual code/config? Obviously feel free to take out the business logic.

  • VarunDamani
    VarunDamani Member Posts: 14
    edited September 8 #7

    Hi @marc
    Here is the piece of code that is handling the external event. And on any event from an external source, it triggers streamBridge.source(binding, event)

    The code is in Kotlin. There isn't any other configuration that I am setting anywhere else.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin

    Hey @VarunDamani,

    Interesting...so it looks like the EventHandler interface has the @StreamListener annotation on it. This annotation was used in Spring Cloud Stream v2 to bind an input channel....I'm assuming that Spring Cloud Stream is picking this up and saying "hey, you want this bound to a messaging channel". If you don't want this function to be triggered via a message can you just remove the @EventHandler annotation?

    @Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @StreamListener
    @interface EventHandler {
    

    Hope that does the trick. Let me know.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , the @EventHandler annotation is not from Spring instead it is from a third-party tool that I am using. I cannot remove the annotation.
    This annotation is the one that catches external events.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin

    @VarunDamani,

    Interesting, can you share what the 3rd party tool is? Where is it connecting to receive the events? That would help figure out what the proper path forward would be.

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , I am using Axon framework. It's an event store. The @EventHandler annotation is part of this framework's api.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin
    edited September 21 #12

    Thanks. I think that might be a different class than what I was thinking so 2 questions:
    1. is this the eventhandler class? https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/EventHandler.java
    2. Where is the app actually receiving events from? (sorry - not familiar with that framework)

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , yes that is the class and Axon is an event-store. So, it has events that we subscribe to.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin

    Hi @VarunDamani,

    Okay, so to me it looks like your issue is very similar to this: https://stackoverflow.com/questions/59181853/is-it-possible-to-disable-spring-cloud-streams-functional-binding-for-a-specifi

    Can you try the suggestions that Oleg made in that post and see if they work for you? I'm also following up with Oleg directly to see if there is a newer suggested approach since 2019.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin

    Hi @VarunDamani,

    I heard back from Oleg and it looks like they made an enhancement for this exact scenario. You can check it out in docs here, but the summary is to set spring.cloud.stream.function.autodetect property to false.

    Hope that works for you!

  • VarunDamani
    VarunDamani Member Posts: 14

    Hi @marc , I tried setting this property along with the configurations pasted earlier but the Queue is still being created with a subscription.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 585 admin
    edited September 28 #17

    Hi @VarunDamani,

    I'm wondering if it could be that you need a newer version of Spring Cloud Stream. Can you check the FunctionConfiguration class on your classpath and see if it has the logic to use autoDetect in it?

    This is the method which seems to use it:
    https://github.com/spring-cloud/spring-cloud-stream/blob/main/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L892:L910

    Note that I had spring-cloud.version 2020.0.2 which did NOT pick it up, but changing to 2020.0.4 it seems to be correct. 2020.0.4 brought in spring-cloud-stream-3.1.4 for me.