Spring cloud stream concurrency

Mike13
Mike13 Member Posts: 29
edited June 2020 in General Discussions #1

Hi there
I'm trying to configure more than one Consumer Thread for my functional consumer, the yaml file looks like this:

cloud:
    stream:
      function:
        definition: bookIn
      bindings:
        bookIn-in-0:
          destination: BOOK
          group: BOOK_CONSUMER
          consumer:
            concurrency: 13 # has no effect_
 

The value is actually taken over into the ConsumerProperties but the messages are always consumed on the same thread (Context_1_ConsumerDispatcher).
What am I missing.
Thanks a lot.
Cheers
Mike

Comments

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Mike13,
    Unfortunately the Solace binder does not currently support the concurrency ConsumerProperty. That said we are hoping to soon and an issue has been opened here just last week where it is being discussed: https://github.com/SolaceProducts/solace-spring-cloud/issues/7

    -Marc

  • Mike13
    Mike13 Member Posts: 29

    Hi Marc
    Thank you!
    Best regards
    Mike

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited June 2020 #4

    Hey @Mike13,
    Just a heads up that support for concurrency will be in the next release and a PR is available here if you want to check it out before it's officially released.
    https://github.com/SolaceProducts/solace-spring-cloud/pull/8
    -Marc

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Mike13,
    Just a heads up that support for concurrency has been released as part of the SCSt Binder. More release info is here

  • Mike13
    Mike13 Member Posts: 29

    Hi @marc
    Great, thank you. I just tested it. There are several consumers now.
    But the com.solace.spring.cloud.stream.binder.inbound.InboundXMLMessageListener#receive loop seems to eat my cpu :-(.
    What can I do about that?
    Beste regards
    Mike

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Thanks for the heads up @Mike13. I'll check and get back with you

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Mike13,
    Nice catch, that's definitely an issue!
    Can you try using 1.1.1-SNAPSHOT from this branch and see if it looks good to you? I tested it locally and it seems to fix the issue for me.

    Appreciate you finding and raising this!

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Note that it's being tracked here: https://github.com/SolaceProducts/solace-spring-cloud/issues/20 and the developer is still looking into an tangential issue

  • Mike13
    Mike13 Member Posts: 29
    edited July 2020 #10

    Hi @marc
    It looks good now :-) (concerning the CPU Spikes). But when I end my application these warnings occur:

    _2020-07-23 18:06:16.504 WARN 15928 --- [pool-3-thread-2] c.s.s.c.s.b.i.InboundXMLMessageListener : Received error while trying to read message from endpoint tms/iad/warnapp/DEV/analytics/v1/algorithm/request.warnapp-analysis

    com.solacesystems.jcsmp.ClosedFacilityException: Tried to call receive on a stopped message consumer.
    at com.solacesystems.jcsmp.impl.flow.FlowHandleImpl.throwClosedException(FlowHandleImpl.java:1905) ~[sol-jcsmp-10.8.1.jar:na]_

    It would be nice if there was a proper shutdown mechanism.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin
    edited July 2020 #11

    @Mike13 Haha, yep that's the tangential issue that I mentioned before. That's being cleaned up :)
    I think he just pushed a fix to that same branch but I haven't tested it yet.

  • marc
    marc Member, Administrator, Moderator, Employee Posts: 914 admin

    Hi @Mike13,
    Heads up that the CPU Spike fix + the errors when stopping the app has been fixed as part of the v1.1.1 release. It's been pushed to maven central so just waiting for their systems to sync. (If only they used Solace to be real-time :tongue: )

    https://github.com/SolaceProducts/solace-spring-cloud/releases/tag/1.1.1
    -Marc

  • sarabojieie
    sarabojieie Member Posts: 4

    @marc Can you please advise how to setup concurrency for reactive implemenation?....currently if i set a concurrency value more than 1 im gettin the statup failure ("Concurrency > 1 is not supported by reactive consumer, given that project reactor maintains its own concurrency mechanism"). Please advise.

  • giri
    giri Member, Administrator, Employee Posts: 103 admin

    Hi @sarabojieie, wondering whether the consumers are created in an anonymous group. It does not support concurrency; configuring a consumer group would help. Let us know!