Try PubSub+

StreamBridge creating consumer and it's subscription

VarunDamaniVarunDamani Member Posts: 11
edited September 6 in General Discussions #1

I have created a Spring cloud stream application that publishes messages to Solace. The publish message is triggered by an external event. I am using StreamBridge to achieve this functionality of publishing messages.

My application.properties look like this

Problem: when the application starts it creates a consumer queue and a subscription to it.

Question: Since my application's job is to only publish messages why does it create a consumer queue. Is there anything that I've missed setting in the properties?

Comments

  • girigiri Member, Administrator, Employee Posts: 29 admin

    Hi Varun,

    My quick take on this is - the queue is created not because you have defined binding (someEvent-out-0), maybe because you have a Consumer function implemented in your application. If you comment/remove that, I believe the queue won't be created.

    Can you check on this, please?

    --Giri

  • VarunDamaniVarunDamani Member Posts: 11

    Hi @giri , there isn't any consumer function that I have defined explicitly. I've just autowired StreamBridge and calling the send method to publish events like the following:

    streamBridge.send("someEvent-out-0", event)

  • ariharih Member, Employee Posts: 78 Solace Employee

    Hi @VarunDamani

    I've tried with a simple SCS app, and like Giri said, this shouldn't have created the queue. Especially since this is "out" binding. My code also uses streamBridge.send( the topic address , the POJO/payload ) , but I'm not sure why you'd put the binding name there.

    But, we do know that Queue creation is logged in the event.log

    example: 2021-09-06T23:22:09.807+00:00 <local3.notice> 589c4eb18123 event: CLIENT: CLIENT_CLIENT_CREATE_ENDPOINT: default Aris-MBP/81736/#000f0001/-XBFz0ctVv Client (15) Aris-MBP/81736/#000f0001/-XBFz0ctVv username default Create Durable Queue scst/wk/cgp2/plain/patient/data/raw/_/bp: From (), AccessType (Non-Exclusive), Quota (5000MB), MaxMessageSize (10000000B), AllOthersPermission(Read|Consume), RespectTTL(No), RejectMsgToSenderOnDiscard(Yes), MaxRedelivery(0)

    If you have access to the logs / cli you can check the log entry when the durable queue got created, and find that client ID on either CLI or web manager - just to confirm if your app did that. And if so, we should investigate more on the complete code I think.

  • VarunDamaniVarunDamani Member Posts: 11

    Hi @arih ,

    Here is the application startup log

    Here you can see that there is a temporary queue being created and it's binding to a consumer.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 504 admin

    Hi @VarunDamani,

    Is the orderPlacedEvent function a Functional Interface? For example, does it return a java.util.function.Function or java.util.function.Consumer? If so then cloud stream might be automatically picking it up and configuring it with default bindings. Note that I don't believe this should happen if you are defining spring.cloud.function.definition and the function is not called out in that config.....any chance you can share the actual code/config? Obviously feel free to take out the business logic.

  • VarunDamaniVarunDamani Member Posts: 11
    edited September 8 #7

    Hi @marc
    Here is the piece of code that is handling the external event. And on any event from an external source, it triggers streamBridge.source(binding, event)

    The code is in Kotlin. There isn't any other configuration that I am setting anywhere else.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 504 admin

    Hey @VarunDamani,

    Interesting...so it looks like the EventHandler interface has the @StreamListener annotation on it. This annotation was used in Spring Cloud Stream v2 to bind an input channel....I'm assuming that Spring Cloud Stream is picking this up and saying "hey, you want this bound to a messaging channel". If you don't want this function to be triggered via a message can you just remove the @EventHandler annotation?

    @Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @StreamListener
    @interface EventHandler {
    

    Hope that does the trick. Let me know.

  • VarunDamaniVarunDamani Member Posts: 11

    Hi @marc , the @EventHandler annotation is not from Spring instead it is from a third-party tool that I am using. I cannot remove the annotation.
    This annotation is the one that catches external events.

Sign In or Register to comment.