@StreamListener(target = Sink.INPUT, condition = "headers['eventType']=='EmployeeTransferredEvent'"

ajinkyasagar
ajinkyasagar Member Posts: 11

I am using latest version of solace streaming and want to filter messages based on header but I see @StreamListener is deprecated and how to achieve this with properties file
i have seen queueAdditionalSubscriptions but its filtering based on topic and not on header properties.

solace:
#The solace bindings section allows for solace specific configurations to be applied to a channel. A common example is adding topic subscriptions to a queue as shown below.
bindings:
myFunction-in-0:
consumer:
queueAdditionalSubscriptions: abc/pqr

is there any way to achieve this with header property with application.yml

Answers

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 568 admin

    Hi @ajinkyasagar,

    So 2 things:
    1. I just want to ask you to consider if filtering at the topic level might make sense. Think about what you want to filter on and ask, is this something that should be in our topic hierarchy? Is it something that others would want to filter on? If the answer is yes and you can still change the topic I'd consider that as it will allow for optimal performance. You can learn more about Topic Best Practices here
    2. Now to your question, with the deprecation of @StreamListener you can no longer filter on the bean itself, but instead you would use Spring Cloud Function's Routing Function capabilities. Following this pattern will essentially have your routing function receive incoming messages and then intelligently route them to your other functions for processing.

    Hope that helps!

  • ajinkyasagar
    ajinkyasagar Member Posts: 11

    1) regarding first point we dont have controo over producer so he will always produce messages with same topic thats limitation
    2) Regarding routing function i can not achive horizontal scalability as function and router function needs to be in same webservice which will run on single JVM

    With streamlistemer i should have able to deploy two different webservices which can lister to different messages based on condition based on header... This help me in horizontal scaling but this annotation is deprecated...

  • ajinkyasagar
    ajinkyasagar Member Posts: 11

    Instead of streamlistener is there any way i can add properties in yaml file where i can mention condition on header

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 568 admin

    Hi @ajinkyasagar,

    I'm confused by what you mean with the statement below. The filters applied by the @StreamListener previously also only applied to events after they were received from the broker. At that point they are within your app running on a single JVM. Maybe I'm just confused. Can you share an example of what worked with @StreamListener that doesn't work now?

    Regarding routing function i can not achive horizontal scalability as function and router function needs to be in same webservice which will run on single JVM

    Instead of streamlistener is there any way i can add properties in yaml file where i can mention condition on header

    If you're using the routing function (pending issue above) you can specify those via the properties/yaml file using somethign like spring.cloud.function.routing-expression=headers['type']

  • ajinkyasagar
    ajinkyasagar Member Posts: 11

    Above statmement is might confusing.
    So explaning with example

    Below is example

    Producer is producing 5 messages
    Message1 header(name=mac)
    Message2 headee(name=jack)
    Message3 header(name=jamie)
    Message4 header(name=micky)
    Message5 header(name=amy)

    I want to deploy microswrvices for each alphabate

    E. G. If name starts with m all messgaes like micky, mac will go to that microservice.

    So if i run 3 microservice

    Microservice1 havign condition on header
    Name=m*(name starts with m)

    Will consumer Message1 and Message4

    Microservice2 havign condition on header
    Name=j*(name starts with m)

    Will consumer Message2 and Message3

    Microservice3 havign condition on header
    Name=a*(name starts with m)

    Will consumer Message5

    How i can achieve this filter based on header in spring solace streaming?
    I wnat to mention pattern for each microservice as config in application.yaml

  • ajinkyasagar
    ajinkyasagar Member Posts: 11
    edited October 4 #7

    @marc did you get chance to check on above example?

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 568 admin
    edited October 4 #8

    Hi @ajinkyasagar,

    Hope you had a great weekend!

    I think there are multiple options and the option you choose really depends on how complex your filtering rules are, how important it is to be able to scale up/down the individual handlers, and how important it is for you to use spring cloud stream.

    Here are the options as I see them:

    1. Create a topic republisher that reads all the messages and publishes them back out on more detailed topic hierarchy which includes the name or whatever fields you want to filter on. This would then allow you to have each microservice subscribe using solace topic wildcards * and > to filter on what you want.
      The upsides of this option: From a broker performance perspective this is great as the broker excels at topic routing. It also makes the messages available for other apps to consume them on a more well-defined topic hierarchy if necessary and allows you to create separate, independent microservices that consume your messages that can easily be scaled up/down. This can be done using Spring Cloud Stream only if that is preferred.
      The down side here is that you are obviously now running a separate app that consumes all the messages and republishes them which takes extra resources. Another down side might be that topic routing may not handle your filtering needs; topic routing will handle your example of filtering on names that start with a certain letter, but if you need to do math it won't help you there.

    2. You use Spring Cloud Stream w/ the routing function as I recommended above.
      Upsides: You can use SPEL for filtering so it can handle more complexity than topic routing, you're using Spring Cloud Stream for simpler development, the broker doesn't have to execute selector processing.
      Downside: As far as I know you can't scale the functions that the routing function calls individually so you'd have to scale the entire app up/down. This is the downside you referenced above as well...

    3. You don't use Spring Cloud Stream, but instead if you still want to use Spring you could use our Spring Boot Java JCSMP or JMS Starters. These will allow you to use Selectors which allow you to do filtering as you wish.
      Upside: JCSMP/JMS allow you to define your selectors per microservice so you get your desired filtering and ability to scale your microservices up/down independently.
      Downsides: Broker has to process selectors which is not as performant as topic routing, but this may not matter depending on your use case. You're not using Spring Cloud Stream so development may be a bit more complex.

    Sorry it's not an easy answer. There are quite a few variables here ;)

  • ajinkyasagar
    ajinkyasagar Member Posts: 11

    Thanks for your answer ...we are trying option 2...

  • sakshi_123
    sakshi_123 Member Posts: 7

    Hi @marc tried option 2 approach, added event routing as a property in application properties (did not route to separate functions since need filtering only on my consumer function)
    added : spring.cloud.function.routing-expression: headers['System']=='abc'

    But the data is still not getting filtered. Can you please advise ?
    If needed we can arrange a quick call as well to review.

  • sakshi_123
    sakshi_123 Member Posts: 7

    also tried: SpringApplication.run(SolaceStreamingExampleApplication.class, "--spring.cloud.function.routing-expression="

    • "headers['System']== 'abc' ? 'myFunction' : 'myFunction2'");

    where myFunction: consumer function
    myFunction2: empty function

    but even this is not filtering

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 568 admin

    Hi @sakshi_123,

    I actually haven't played with this capability previously myself. I tried it out some today and added a working app in this repo: https://github.com/Mrc0113/scs-routing-example

    A few things of note:

    • You'll want to make sure you are configuring the functionRouter-in-0 binding to receive the messages that you want. I do not see a way to override that name
    • My example routes on the "solace_correlationId" header purely out of convenience. You can set this in the Try-Me tab of the PubSub+ Manager, but of course you can use a different header if you'd like. If you set it to cons1 it will execute the consumer1 method, otherwise it executes the consumer2 one.
    • If you substitute the Consumer for a Function because you have outbound messages then it looks like it they all actually go out the functionRouter-out-0 binding so you can either send them all to one place, or you can use one of the dynamic destination publishing options defined in section 6 of this codelab

    Hope that helps!

  • sakshi_123
    sakshi_123 Member Posts: 7
    edited October 8 #13

    Hi @marc, so my use case is bit different. I want to route to different web apps and not different functions hence i dont want my routing to be function based.

  • sakshi_123
    sakshi_123 Member Posts: 7
    edited October 8 #14

    Also in the future I wish to filter on ID (such that if ID starts with 1,2,3 -> route to web app 1, and if ID starts with 4,5,6 route to web app 2)
    I also want messages in same sequence that they re published

    Is this approach possible to implement currently? without binding to functions in a single web app?

  • sakshi_123
    sakshi_123 Member Posts: 7
    edited October 8 #15

    This is my current .yml . Please also note that my queue is exclusive.

     spring:  
        cloud: 
          function: 
            definition: myFunction
            routing:
              enabled: true
    
    stream:
      poller:
        fixed-delay: 1000
      binders:  
      #This section of the configuration tells the solace binder how to connect to the solace event broker/mesh 
        local-solace:  
          type: solace 
          environment: 
            solace: 
              java: 
    
    
      bindings:  
      #The bindings section is used to define your input and output channels. 
        myFunction-in-0: 
          destination: Test
          routing-expression: headers['System']== 'abc' ? 'myFunction'  
          group: exclusive
    
    
    
      solace: 
      #The solace bindings section allows for solace specific configurations to be applied to a channel. A common example is adding topic subscriptions to a queue as shown below. 
        bindings: 
          myFunction-in-0: 
            consumer: 
              autoBindErrorQueue: false
              provisionDurableQueue: false
              queueNamePrefix: ""
              useFamiliarityInQueueName: false
              useDestinationEncodingInQueueName: false
              useGroupNameInQueueName: false
              queueAdditionalSubscriptions: testing/test1
              queue-access-type: 1
    
  • arpit_k
    arpit_k Member Posts: 2

    Hi Team,

    **Need some input on spring cloud streaming with solace for which we are streaming a queues and want to have a batch operation to control the no of messages ,batch size and sequence of events what will be the configuration if wee need to progress as i can see for kakfa itis max.poll.records, will below configuration as ok or do we need some else

    queueMaxMsgSize
    Maximum message size for the consumer group queue.

    queueQuota
    Message spool quota for the consumer group queue.

    Thanks in advance
    Arpit

  • Tamimi
    Tamimi Member, Administrator, Employee Posts: 228 admin

    Hey @arpit_k - looks like your question might not be related to this post (filter messages based on header). If that's the case can you please start a new discussion post over here https://solace.community/post/discussion and make sure to include the tags as well :)

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 568 admin

    @sakshi_123 said:
    Hi @marc, so my use case is bit different. I want to route to different web apps and not different functions hence i dont want my routing to be function based.

    Hi @sakshi_123,

    Looking at this I think the cleanest option would be to go with option #1 that I mentioned previously. This was the option that started with "Create a topic republisher that reads all the messages and publishes them back out on more detailed topic hierarchy". By going that route you can put the items that you'd like to filter on now, and think you might need to filter on in the future, into the topic hierarchy of the republished messages. This will allow you to filter on those items in your topic subscriptions (you can have multiple per app) and avoid having an app that needs to act as an orchestrator between your different web apps. This will reduce coupling, and allow the Event Broker to take on the responsibility of maintaining order across your events.