Spring cloud stream concurrency

Hi there
I’m trying to configure more than one Consumer Thread for my functional consumer, the yaml file looks like this:

cloud:
    stream:
      function:
        definition: bookIn
      bindings:
        bookIn-in-0:
          destination: BOOK
          group: BOOK_CONSUMER
          consumer:
            concurrency: 13 # has no effect_
 

The value is actually taken over into the ConsumerProperties but the messages are always consumed on the same thread (Context_1_ConsumerDispatcher).
What am I missing.
Thanks a lot.
Cheers
Mike

Hi @Mike13,
Unfortunately the Solace binder does not currently support the concurrency ConsumerProperty. That said we are hoping to soon and an issue has been opened here just last week where it is being discussed: SCSt Binder Doesn't Seem to support concurrency Consumer Property · Issue #7 · SolaceProducts/solace-spring-cloud · GitHub

-Marc

Hi Marc
Thank you!
Best regards
Mike

Hey @Mike13,
Just a heads up that support for concurrency will be in the next release and a PR is available here if you want to check it out before it’s officially released.

-Marc

Hi @Mike13,
Just a heads up that support for concurrency has been released as part of the SCSt Binder. More release info is here

Hi @marc
Great, thank you. I just tested it. There are several consumers now.
But the com.solace.spring.cloud.stream.binder.inbound.InboundXMLMessageListener#receive loop seems to eat my cpu :-(.
What can I do about that?
Beste regards
Mike

Thanks for the heads up @Mike13. I’ll check and get back with you

Hi @Mike13,
Nice catch, that’s definitely an issue!
Can you try using 1.1.1-SNAPSHOT from this branch and see if it looks good to you? I tested it locally and it seems to fix the issue for me.

Appreciate you finding and raising this!

Note that it’s being tracked here: Cloud Stream Binder CPU Usage Spike · Issue #20 · SolaceProducts/solace-spring-cloud · GitHub and the developer is still looking into an tangential issue

Hi @marc
It looks good now :slight_smile: (concerning the CPU Spikes). But when I end my application these warnings occur:

_2020-07-23 18:06:16.504 WARN 15928 — [pool-3-thread-2] c.s.s.c.s.b.i.InboundXMLMessageListener : Received error while trying to read message from endpoint tms/iad/warnapp/DEV/analytics/v1/algorithm/request.warnapp-analysis

com.solacesystems.jcsmp.ClosedFacilityException: Tried to call receive on a stopped message consumer.
at com.solacesystems.jcsmp.impl.flow.FlowHandleImpl.throwClosedException(FlowHandleImpl.java:1905) ~[sol-jcsmp-10.8.1.jar:na]_

It would be nice if there was a proper shutdown mechanism.

@Mike13 Haha, yep that’s the tangential issue that I mentioned before. That’s being cleaned up :slight_smile:
I think he just pushed a fix to that same branch but I haven’t tested it yet.

Hi @Mike13,
Heads up that the CPU Spike fix + the errors when stopping the app has been fixed as part of the v1.1.1 release. It’s been pushed to maven central so just waiting for their systems to sync. (If only they used Solace to be real-time :tongue: )

-Marc

@marc.dipasquale Can you please advise how to setup concurrency for reactive implemenation?..currently if i set a concurrency value more than 1 im gettin the statup failure (“Concurrency > 1 is not supported by reactive consumer, given that project reactor maintains its own concurrency mechanism”). Please advise.

Hi @sarabojieie , wondering whether the consumers are created in an anonymous group. It does not support concurrency; configuring a consumer group would help. Let us know!