Yesterday I got asked by a developer how she could have multiple functions in her Spring Cloud Stream microservice each consume from its’ own queue. Luckily the framework makes this relatively simple to accomplish, but the devil is in the details as your configuration must match your code so I figured I’d share the steps and things to watch out for.
- Step 1: Code your Function
The first step is of course to write the code! Since we’re trying to consume from a queue you would create a bean of typejava.util.function.Consumer
orjava.util.function.Function
. For the example we’ll use one of each.
@SpringBootApplication
public class MyCloudStreamApp {
public static void main(String[] args) {
SpringApplication.run(MyCloudStreamApp.class);
}
@Bean
public Consumer<String> consume() {
return System.out::println;
}
@Bean
public Function<String, String> uppercase(){
return v -> {
System.out.println("Uppercasing: " + v);
return v.toUpperCase();
};
}
}
- Step 2: Make Cloud Stream aware of our functions
Now that we have our functions coded we need to specify which functional beans we want Cloud Stream to attempt to bind to external destinations using thespring.cloud.function.definition
property.
In the example spring config below you’ll see that I defined both of our functions we coded in step 1. Note that the functions are delimited by a semicolon and match the method name in the code.
spring:
cloud:
function:
definition: consume;uppercase
- Step 3: Configure the Bindings
Now that the Cloud Stream framework is aware of our functions we need to actually configure the bindings. The bindings are used by the framework to tell the Cloud Stream Binder how to connect our function(s) to the messaging system. In the case of the Solace PubSub+ Binder this means consuming from a queue and publishing to a topic. The example config below extends our config from Step 2 to include the bindings configuration. Note that the binding name conventions specify the input binding names asfunctionName
+-in-
+index
andfunctionName
+-out-
+index
for the output bindings where index is always 0 for typical single input/output functions.
spring:
cloud:
function:
definition: consume;uppercase
stream:
bindings:
consume-in-0:
destination: Q.CONSUME
uppercase-in-0:
destination: Q.UPPERCASE
uppercase-out-0:
destination: spring/sample/string/uppercase
- Step 4: Add Topic Subscriptions
If you’re using Solace as your event broker the next thing that you’ll likely want to do is add Topic Subscriptions to your queue (to follow best practices of publish to topics and consume from queues). This will tell the broker which messages you want attracted to your queue for consumption. You can do this by specifying thespring.cloud.stream.solace.bindings.BINDINGNAME.consumer.queueAdditionalSubscriptions
property, where BINDINGNAME matches the specific binding whose queue you want the subscriptions added to.
solace:
bindings:
consume-in-0:
consumer:
queueAdditionalSubscriptions: spring/sample/consume
uppercase-in-0:
consumer:
queueAdditionalSubscriptions: spring/sample/uppercase,wildcards/*/accepted/>
At this point we have the consume
function processing events that arrive on the Q.CONSUME
queue and the uppercase
function processing events that arrive on the Q.UPPERCASE
queue. And each of those queues are configured with their own unique topic subscriptions which can use wildcards to allow for fine grained filtering of the events that we want processed.
Hope that’s useful!