StreamBridge creating consumer and it's subscription
I have created a Spring cloud stream application that publishes messages to Solace. The publish message is triggered by an external event. I am using StreamBridge to achieve this functionality of publishing messages.
My application.properties look like this
Problem: when the application starts it creates a consumer queue and a subscription to it.
Question: Since my application's job is to only publish messages why does it create a consumer queue. Is there anything that I've missed setting in the properties?
Comments
-
Hi Varun,
My quick take on this is - the queue is created not because you have defined binding (someEvent-out-0), maybe because you have a Consumer function implemented in your application. If you comment/remove that, I believe the queue won't be created.
Can you check on this, please?
--Giri
1 -
Hi @giri , there isn't any consumer function that I have defined explicitly. I've just autowired StreamBridge and calling the send method to publish events like the following:
streamBridge.send("someEvent-out-0", event)
0 -
Hi @VarunDamani
I've tried with a simple SCS app, and like Giri said, this shouldn't have created the queue. Especially since this is "out" binding. My code also uses streamBridge.send( the topic address , the POJO/payload ) , but I'm not sure why you'd put the binding name there.
But, we do know that Queue creation is logged in the event.log
example: 2021-09-06T23:22:09.807+00:00 <local3.notice> 589c4eb18123 event: CLIENT: CLIENT_CLIENT_CREATE_ENDPOINT: default Aris-MBP/81736/#000f0001/-XBFz0ctVv Client (15) Aris-MBP/81736/#000f0001/-XBFz0ctVv username default Create Durable Queue scst/wk/cgp2/plain/patient/data/raw/_/bp: From (), AccessType (Non-Exclusive), Quota (5000MB), MaxMessageSize (10000000B), AllOthersPermission(Read|Consume), RespectTTL(No), RejectMsgToSenderOnDiscard(Yes), MaxRedelivery(0)
If you have access to the logs / cli you can check the log entry when the durable queue got created, and find that client ID on either CLI or web manager - just to confirm if your app did that. And if so, we should investigate more on the complete code I think.
1 -
Hi @arih ,
Here is the application startup log
Here you can see that there is a temporary queue being created and it's binding to a consumer.
0 -
Hi @VarunDamani,
Is the
orderPlacedEvent
function a Functional Interface? For example, does it return ajava.util.function.Function
orjava.util.function.Consumer
? If so then cloud stream might be automatically picking it up and configuring it with default bindings. Note that I don't believe this should happen if you are definingspring.cloud.function.definition
and the function is not called out in that config.....any chance you can share the actual code/config? Obviously feel free to take out the business logic.0 -
Hi @marc
Here is the piece of code that is handling the external event. And on any event from an external source, it triggersstreamBridge.source(binding, event)
The code is in Kotlin. There isn't any other configuration that I am setting anywhere else.
0 -
Hey @VarunDamani,
Interesting...so it looks like the
EventHandler
interface has the@StreamListener
annotation on it. This annotation was used in Spring Cloud Stream v2 to bind an input channel....I'm assuming that Spring Cloud Stream is picking this up and saying "hey, you want this bound to a messaging channel". If you don't want this function to be triggered via a message can you just remove the@EventHandler
annotation?@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) @Retention(RetentionPolicy.RUNTIME) @MessageMapping @Documented @StreamListener @interface EventHandler {
Hope that does the trick. Let me know.
1 -
Hi @marc , the @EventHandler annotation is not from Spring instead it is from a third-party tool that I am using. I cannot remove the annotation.
This annotation is the one that catches external events.0 -
Interesting, can you share what the 3rd party tool is? Where is it connecting to receive the events? That would help figure out what the proper path forward would be.
0 -
Hi @marc , I am using Axon framework. It's an event store. The @EventHandler annotation is part of this framework's api.
0 -
Thanks. I think that might be a different class than what I was thinking so 2 questions:
1. is this the eventhandler class? https://github.com/AxonFramework/AxonFramework/blob/master/messaging/src/main/java/org/axonframework/eventhandling/EventHandler.java
2. Where is the app actually receiving events from? (sorry - not familiar with that framework)0 -
Hi @marc , yes that is the class and Axon is an event-store. So, it has events that we subscribe to.
0 -
Hi @VarunDamani,
Okay, so to me it looks like your issue is very similar to this: https://stackoverflow.com/questions/59181853/is-it-possible-to-disable-spring-cloud-streams-functional-binding-for-a-specifi
Can you try the suggestions that Oleg made in that post and see if they work for you? I'm also following up with Oleg directly to see if there is a newer suggested approach since 2019.
0 -
Hi @VarunDamani,
I heard back from Oleg and it looks like they made an enhancement for this exact scenario. You can check it out in docs here, but the summary is to set
spring.cloud.stream.function.autodetect
property tofalse
.Hope that works for you!
0 -
Hi @marc , I tried setting this property along with the configurations pasted earlier but the Queue is still being created with a subscription.
0 -
Hi @VarunDamani,
I'm wondering if it could be that you need a newer version of Spring Cloud Stream. Can you check the
FunctionConfiguration
class on your classpath and see if it has the logic to useautoDetect
in it?This is the method which seems to use it:
https://github.com/spring-cloud/spring-cloud-stream/blob/main/spring-cloud-stream/src/main/java/org/springframework/cloud/stream/function/FunctionConfiguration.java#L892:L910Note that I had
spring-cloud.version
2020.0.2 which did NOT pick it up, but changing to 2020.0.4 it seems to be correct. 2020.0.4 brought inspring-cloud-stream-3.1.4
for me.0 -
Hi.. @marc
I am using StreamBridge .send method to bind the data but did not get any message data in below out put...
@Bean public Consumer<DataCode> event1(StreamBridge streamBridge) { return ds -> { DataCode dataCode = new DataCode(); dataCode.setEventType(ds.getEventType().toUpperCase()); dataCode.setNetwork(ds.getNetwork().toUpperCase()); String topic = TOPIC_PREFIX + ds.getEventType() + "/" + ds.getNetwork(); //System.out.println("Event E1 for order creation for Network1-----------------------" +topic +ds.getEventType() +ds.getNetwork()); streamBridge.send(topic,dataCode); }; }
output:----I am getting <message handler ID: 18bab7d0-e7d1-4859-b543-253f00bb5cf2> instead of event and network value. please suggest.{"eventType":"update2",
"network": "network3"}
com/Supply/Order/update2/network3 <message handler ID: 18bab7d0-e7d1-4859-b543-253f00bb5cf2>
0 -
Hi @soniya - Need more information. It could be due to a content-encoding issue.
Please check your cloud.stream.bindings.event1 settings. If you have "useNativeEncoding" property set, turn it off. If not set, just set it as false - like "useConentEncoding: false" and try. Just wanted to check if this makes a difference in your setup.
Example:
spring: cloud: function: definition: consumer stream: bindings: consumer-in-0: destination: 'community/consume/topic' group: community consumer: useNativeDecoding: false
0