Consume from multiple queues in Spring Cloud Stream

marc
marc Member, Administrator, Moderator, Employee Posts: 959 admin
edited September 2020 in Connectors & Integrations #1

Yesterday I got asked by a developer how she could have multiple functions in her Spring Cloud Stream microservice each consume from its' own queue. Luckily the framework makes this relatively simple to accomplish, but the devil is in the details as your configuration must match your code so I figured I'd share the steps and things to watch out for.

  • Step 1: Code your Function
    The first step is of course to write the code! Since we're trying to consume from a queue you would create a bean of type java.util.function.Consumer or java.util.function.Function. For the example we'll use one of each.
@SpringBootApplication
public class MyCloudStreamApp {

    public static void main(String[] args) {
        SpringApplication.run(MyCloudStreamApp.class);
    }
    @Bean
    public Consumer<String> consume() {
        return System.out::println;
    }
    @Bean
    public Function<String, String> uppercase(){
        return v -> {
           System.out.println("Uppercasing: " + v);
           return v.toUpperCase();
        };
    }
}
  • Step 2: Make Cloud Stream aware of our functions
    Now that we have our functions coded we need to specify which functional beans we want Cloud Stream to attempt to bind to external destinations using the spring.cloud.function.definition property.
    In the example spring config below you'll see that I defined both of our functions we coded in step 1. Note that the functions are delimited by a semicolon and match the method name in the code.
spring:
  cloud:
    function:
      definition: consume;uppercase
  • Step 3: Configure the Bindings
    Now that the Cloud Stream framework is aware of our functions we need to actually configure the bindings. The bindings are used by the framework to tell the Cloud Stream Binder how to connect our function(s) to the messaging system. In the case of the Solace PubSub+ Binder this means consuming from a queue and publishing to a topic. The example config below extends our config from Step 2 to include the bindings configuration. Note that the binding name conventions specify the input binding names as functionName + -in- + index and functionName+-out-+index for the output bindings where index is always 0 for typical single input/output functions.
spring:
  cloud:
    function:
      definition: consume;uppercase
    stream:
      bindings:
        consume-in-0:
          destination: Q.CONSUME
        uppercase-in-0:
          destination: Q.UPPERCASE
        uppercase-out-0:
          destination: spring/sample/string/uppercase
  • Step 4: Add Topic Subscriptions
    If you're using Solace as your event broker the next thing that you'll likely want to do is add Topic Subscriptions to your queue (to follow best practices of publish to topics and consume from queues). This will tell the broker which messages you want attracted to your queue for consumption. You can do this by specifying the spring.cloud.stream.solace.bindings.BINDINGNAME.consumer.queueAdditionalSubscriptions property, where BINDINGNAME matches the specific binding whose queue you want the subscriptions added to.
      solace: 
        bindings: 
          consume-in-0: 
            consumer: 
              queueAdditionalSubscriptions: spring/sample/consume
          uppercase-in-0:
            consumer:
              queueAdditionalSubscriptions: spring/sample/uppercase,wildcards/*/accepted/>

At this point we have the consume function processing events that arrive on the Q.CONSUME queue and the uppercase function processing events that arrive on the Q.UPPERCASE queue. And each of those queues are configured with their own unique topic subscriptions which can use wildcards to allow for fine grained filtering of the events that we want processed.

Hope that's useful!

Comments

  • giri
    giri Member, Administrator, Employee Posts: 116 admin

    Thanks, Marc! The benefits of programming to a framework come from the permutation and combinations it offers for possible configurations, each of which could be a different use case. However, it comes at the cost of dealing with proper indentation (if using YAML), the proper order of attributes in the hierarchy, strict naming convention, etc. But it is worth it IMO 👍️

  • Tilak
    Tilak Member Posts: 6

    Hi Marc,

    When we have separate processor function, it works fine but when same processor is given multiple names as below

    @Bean({"processMessage1","processMessage2"})
    Function<Message<List>, List<Message>> receiveMessage() {

    }

    it throws below error for one on the processor.

    org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.processMessage2-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers.

    In our case processor logic is same for messages consumed from multiple queues. Is there any option to avoid duplicating code.

    Thanks

  • giri
    giri Member, Administrator, Employee Posts: 116 admin

    Hi @Tilak - I had the opportunity to interact with a senior spring cloud stream team member. He confirmed that this is not supported as it unravels complexities around resource/connection creation and management - however was open to look into this if you can open a github issue on the scst repo with explanation of why and what business case gets served by this.

    I would highly recommend that you open the github issue with details if this is a critical usecase for you.