Consume from multiple queues in Spring Cloud Stream

marc
marc Member, Administrator, Moderator, Employee Posts: 584 admin
edited September 2020 in Connectors & Integrations #1

Yesterday I got asked by a developer how she could have multiple functions in her Spring Cloud Stream microservice each consume from its' own queue. Luckily the framework makes this relatively simple to accomplish, but the devil is in the details as your configuration must match your code so I figured I'd share the steps and things to watch out for.

  • Step 1: Code your Function
    The first step is of course to write the code! Since we're trying to consume from a queue you would create a bean of type java.util.function.Consumer or java.util.function.Function. For the example we'll use one of each.
@SpringBootApplication
public class MyCloudStreamApp {

    public static void main(String[] args) {
        SpringApplication.run(MyCloudStreamApp.class);
    }
    @Bean
    public Consumer<String> consume() {
        return System.out::println;
    }
    @Bean
    public Function<String, String> uppercase(){
        return v -> {
           System.out.println("Uppercasing: " + v);
           return v.toUpperCase();
        };
    }
}
  • Step 2: Make Cloud Stream aware of our functions
    Now that we have our functions coded we need to specify which functional beans we want Cloud Stream to attempt to bind to external destinations using the spring.cloud.function.definition property.
    In the example spring config below you'll see that I defined both of our functions we coded in step 1. Note that the functions are delimited by a semicolon and match the method name in the code.
spring:
  cloud:
    function:
      definition: consume;uppercase
  • Step 3: Configure the Bindings
    Now that the Cloud Stream framework is aware of our functions we need to actually configure the bindings. The bindings are used by the framework to tell the Cloud Stream Binder how to connect our function(s) to the messaging system. In the case of the Solace PubSub+ Binder this means consuming from a queue and publishing to a topic. The example config below extends our config from Step 2 to include the bindings configuration. Note that the binding name conventions specify the input binding names as functionName + -in- + index and functionName+-out-+index for the output bindings where index is always 0 for typical single input/output functions.
spring:
  cloud:
    function:
      definition: consume;uppercase
    stream:
      bindings:
        consume-in-0:
          destination: Q.CONSUME
        uppercase-in-0:
          destination: Q.UPPERCASE
        uppercase-out-0:
          destination: spring/sample/string/uppercase
  • Step 4: Add Topic Subscriptions
    If you're using Solace as your event broker the next thing that you'll likely want to do is add Topic Subscriptions to your queue (to follow best practices of publish to topics and consume from queues). This will tell the broker which messages you want attracted to your queue for consumption. You can do this by specifying the spring.cloud.stream.solace.bindings.BINDINGNAME.consumer.queueAdditionalSubscriptions property, where BINDINGNAME matches the specific binding whose queue you want the subscriptions added to.
      solace: 
        bindings: 
          consume-in-0: 
            consumer: 
              queueAdditionalSubscriptions: spring/sample/consume
          uppercase-in-0:
            consumer:
              queueAdditionalSubscriptions: spring/sample/uppercase,wildcards/*/accepted/>

At this point we have the consume function processing events that arrive on the Q.CONSUME queue and the uppercase function processing events that arrive on the Q.UPPERCASE queue. And each of those queues are configured with their own unique topic subscriptions which can use wildcards to allow for fine grained filtering of the events that we want processed.

Hope that's useful!