Try PubSub+

Subscription concurrency to speed up data consumption

manzarulmanzarul Member Posts: 16

Hi,
In solace I pre-configured a Non-exclusive queue and now trying to consume data in spring cloud steam app , by providing spring.cloud.stream.bindings.fn-name.consumer.concurrency=4 , In logs i can see it's trying to start four subscriber but all four are getting timed out. If we make this number as 1 or remove this property then it's able to connect and consume data. The reason for starting multiple consumer is to consume that queue data in parallel , so that it would be fast consumption.
On same time if we allow spring cloud to create new queue and bind with four consumer it's able to bind.

Here is my application.yaml

spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: test-fu-q1
group: nonexclusive
consumer:
concurrency: 4

  #Note if we uncomment below and try with  same above settings then it's not working , but if we #change  concurrency to 1 or just remove it and uncomment below part then it's working.
  #solace:
    #bindings:
      #myConsumer-in-0:
        #consumer:
          #provisionSubscriptionsToDurableQueue: false
          #provisionDurableQueue: false
          #queueNamePrefix: ""
          #useFamiliarityInQueueName: false
          #useDestinationEncodingInQueueName: false
          #useGroupNameInQueueName: false
          #concurrency: 4 , **this value is not being consider**

if i refer Consumer Concurrency point number 3 then it seems concurrency >1 will work only for push based subscription not pull based. and for push based means end point need to be register under solace console to push the message.

Need your help to confirm the behavior or if some thing i am doing wrong.

Comments

  • marcmarc Member, Administrator, Moderator, Employee Posts: 568 admin

    Hi @manzarul,

    Interesting error. Note that you would set concurrency how you have it first in your config (not solace specific). That said it feels to me like something else is the issue. This exception is weird to me. Any chance your app is running out of memory or anything like that?

    com.solacesystems.jcsmp.JCSMPTransportException: (JCSMPTransportException) Error receiving data from underlying connection.
        at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel$ClientChannelReconnect.call(TcpClientChannel.java:2320) ~[sol-jcsmp-10.10.0.jar:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na]
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
    Caused by: com.solacesystems.jcsmp.JCSMPTransportException: Timeout reading response to ADCTRL operation. ([BRT resource=test-fu-q1 [email protected]8366 flowType=CONSUMER counter=12])
        at com.solacesystems.jcsmp.impl.flow.BindRequestTask.handleTimeout(BindRequestTask.java:402) ~[sol-jcsmp-10.10.0.jar:na]
        at com.solacesystems.jcsmp.impl.timers.impl.JCSMPTimerQueueImpl.runAllTo(JCSMPTimerQueueImpl.java:91) ~[sol-jcsmp-10.10.0.jar:na]
        at com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor.eventLoop(SyncEventDispatcherReactor.java:168) ~[sol-jcsmp-10.10.0.jar:na]
        at com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor$SEDReactorThread.run(SyncEventDispatcherReactor.java:338) ~[sol-jcsmp-10.10.0.jar:na]
        ... 1 common frames omitted
    
  • manzarulmanzarul Member Posts: 16

    There is no memory issue, I tried to run three instances on my system by changing port number and all three was running without any issue. But it will have issues if we set concurrency >1.
    Can you see any issues in application.yaml files.

  • AaronAaron Member, Moderator, Employee Posts: 285 Solace Employee
    edited July 25 #4

    That is an interesting exception.... transport exception due to an AD control message timeout. And you say this works just fine with concurrency == 1. But not 2. But if you set it to 2, you see 2 subscribers start, but then timeout. On the Solace broker, do you see multiple CLIENT_CONNECT log messages (in the event.log), or just a single CONNECT, and then multiple BIND events (i.e. the same connection connecting multiple consumers to the queue?

    I'm wondering if there is some kind of thread/concurrency issue with multiple (Spring) consumers deadlocking/starving some internal API thread for sending/receiving of heartbeat data.

    Any chance you could provide us with full working code/project to test..?

    EDIT/UPDATE:

    Just looked at the log file in a bit more detail... seems like 1 consumer, and then connecting multiple consumers/flows to the queue. And then 2nd consumer hangs until throwing an exception:

    2021-07-19 14:22:17.143  INFO 16568 --- [           main] c.s.j.protocol.impl.TcpClientChannel     : Connected to host 'orig=tcps://URL, scheme=tcps://, host=URL, port=55443' (smfclient 1)
    2021-07-19 14:22:17.278  INFO 16568 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Caching the binder: solace-cloud
    2021-07-19 14:22:17.278  INFO 16568 --- [           main] o.s.c.s.binder.DefaultBinderFactory      : Retrieving cached binder: solace-cloud
    2021-07-19 14:22:17.360  INFO 16568 --- [           main] c.s.s.c.s.b.p.SolaceQueueProvisioner     : Creating durable queue test-fu-q1 for consumer group nonexclusive
    2021-07-19 14:22:17.361  INFO 16568 --- [           main] c.s.s.c.s.b.p.SolaceQueueProvisioner     : Durable queue provisioning is disabled, test-fu-q1 will not be provisioned nor will its configuration be validated
    2021-07-19 14:22:17.361  INFO 16568 --- [           main] c.s.s.c.s.b.p.SolaceQueueProvisioner     : Testing consumer flow connection to queue test-fu-q1 (will not start it)
    2021-07-19 14:22:17.538  INFO 16568 --- [           main] c.s.s.c.s.b.p.SolaceQueueProvisioner     : Connected test consumer flow to queue test-fu-q1, closing it
    2021-07-19 14:22:17.542  INFO 16568 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'test-fu-q1.nonexclusive.errors' has 1 subscriber(s).
    2021-07-19 14:22:17.543  INFO 16568 --- [           main] o.s.c.stream.binder.BinderErrorChannel   : Channel 'test-fu-q1.nonexclusive.errors' has 2 subscriber(s).
    2021-07-19 14:22:17.547  INFO 16568 --- [           main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating 4 consumer flows for queue test-fu-q1 <inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84>
    2021-07-19 14:22:17.547  INFO 16568 --- [           main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 1 of 4 for inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84
    2021-07-19 14:22:17.548  INFO 16568 --- [           main] c.s.s.c.s.b.util.FlowReceiverContainer   : Binding flow receiver container a349f688-f81f-48c0-a1a2-1cedd0dabf8e
    2021-07-19 14:22:17.618  INFO 16568 --- [           main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 2 of 4 for inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84
    2021-07-19 14:22:17.619  INFO 16568 --- [           main] c.s.s.c.s.b.util.FlowReceiverContainer   : Binding flow receiver container 8823d65d-c6dc-4a64-bb88-bde0f4cee39c
    
    *** 5 seconds later
    
    2021-07-19 14:22:23.221  INFO 16568 --- [1_ReactorThread] c.s.j.p.n.impl.SubscriberMessageReader   : (Client name: ITSEHBG-DN0202/16568/#00130001/GIPDvkrS5E   Local addr: 10.61.243.253 Local port: 53303   Remote addr: URL  Remote port: 55443) - SubscriberMessageReader:read Exception
    
    java.io.IOException: An existing connection was forcibly closed by the remote host
    

    All the logs are coming from the main thread, but then a lot of the exceptions that follow this are coming from "reactor thread", "connect service", "scheduling-1" (I don't recognize that thread name as Solace). But also everything is at INFO level.

    Any chance of turning the logging up to DEBUG and re-running?

  • manzarulmanzarul Member Posts: 16

    Sure , will provide the debug log and code base as well in some time.

  • manzarulmanzarul Member Posts: 16

    Please find the logs as attach
    1. output-debug.log (logs capture in debug mode)
    2. output-info.log (logs capture in info mode)
    3. output-with-concurrency-1.log (when concurrency is set to 1 , this is working as expected)

  • marcmarc Member, Administrator, Moderator, Employee Posts: 568 admin

    Hi @manzarul,

    Interesting. Neither I nor our support team were able to reproduce this issue. Any chance you can try with our "cloud-stream-sink" samples from here: https://github.com/SolaceSamples/solace-samples-spring and see if that has the same behavior?

    That might help us narrow down if it's an environment issue or some issue with the app itself.

    Hope that helps!

  • manzarulmanzarul Member Posts: 16

    I update spring parent version and then concurrency started working.
    current pom:

    org.springframework.boot
    spring-boot-starter-parent
    2.5.4-SNAPSHOT


    Old one was 2.5.3-SNAPSHOT

Sign In or Register to comment.