Consume from multiple queues in Spring Cloud Stream
Yesterday I got asked by a developer how she could have multiple functions in her Spring Cloud Stream microservice each consume from its' own queue. Luckily the framework makes this relatively simple to accomplish, but the devil is in the details as your configuration must match your code so I figured I'd share the steps and things to watch out for.
- Step 1: Code your Function
The first step is of course to write the code! Since we're trying to consume from a queue you would create a bean of typejava.util.function.Consumer
orjava.util.function.Function
. For the example we'll use one of each.
@SpringBootApplication public class MyCloudStreamApp { public static void main(String[] args) { SpringApplication.run(MyCloudStreamApp.class); } @Bean public Consumer<String> consume() { return System.out::println; } @Bean public Function<String, String> uppercase(){ return v -> { System.out.println("Uppercasing: " + v); return v.toUpperCase(); }; } }
- Step 2: Make Cloud Stream aware of our functions
Now that we have our functions coded we need to specify which functional beans we want Cloud Stream to attempt to bind to external destinations using thespring.cloud.function.definition
property.
In the example spring config below you'll see that I defined both of our functions we coded in step 1. Note that the functions are delimited by a semicolon and match the method name in the code.
spring: cloud: function: definition: consume;uppercase
- Step 3: Configure the Bindings
Now that the Cloud Stream framework is aware of our functions we need to actually configure the bindings. The bindings are used by the framework to tell the Cloud Stream Binder how to connect our function(s) to the messaging system. In the case of the Solace PubSub+ Binder this means consuming from a queue and publishing to a topic. The example config below extends our config from Step 2 to include the bindings configuration. Note that the binding name conventions specify the input binding names asfunctionName
+-in-
+index
andfunctionName
+-out-
+index
for the output bindings where index is always 0 for typical single input/output functions.
spring: cloud: function: definition: consume;uppercase stream: bindings: consume-in-0: destination: Q.CONSUME uppercase-in-0: destination: Q.UPPERCASE uppercase-out-0: destination: spring/sample/string/uppercase
- Step 4: Add Topic Subscriptions
If you're using Solace as your event broker the next thing that you'll likely want to do is add Topic Subscriptions to your queue (to follow best practices of publish to topics and consume from queues). This will tell the broker which messages you want attracted to your queue for consumption. You can do this by specifying thespring.cloud.stream.solace.bindings.BINDINGNAME.consumer.queueAdditionalSubscriptions
property, where BINDINGNAME matches the specific binding whose queue you want the subscriptions added to.
solace: bindings: consume-in-0: consumer: queueAdditionalSubscriptions: spring/sample/consume uppercase-in-0: consumer: queueAdditionalSubscriptions: spring/sample/uppercase,wildcards/*/accepted/>
At this point we have the consume
function processing events that arrive on the Q.CONSUME
queue and the uppercase
function processing events that arrive on the Q.UPPERCASE
queue. And each of those queues are configured with their own unique topic subscriptions which can use wildcards to allow for fine grained filtering of the events that we want processed.
Hope that's useful!
Comments
-
Thanks, Marc! The benefits of programming to a framework come from the permutation and combinations it offers for possible configurations, each of which could be a different use case. However, it comes at the cost of dealing with proper indentation (if using YAML), the proper order of attributes in the hierarchy, strict naming convention, etc. But it is worth it IMO 👍️
1 -
Hi Marc,
When we have separate processor function, it works fine but when same processor is given multiple names as below
@Bean({"processMessage1","processMessage2"})
Function<Message<List>, List<Message>> receiveMessage() {}
it throws below error for one on the processor.
org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processMessage2-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers.
In our case processor logic is same for messages consumed from multiple queues. Is there any option to avoid duplicating code.
Thanks
0 -
Hi @Tilak - I had the opportunity to interact with a senior spring cloud stream team member. He confirmed that this is not supported as it unravels complexities around resource/connection creation and management - however was open to look into this if you can open a github issue on the scst repo with explanation of why and what business case gets served by this.
I would highly recommend that you open the github issue with details if this is a critical usecase for you.
1