Subscription concurrency to speed up data consumption
Hi,
In solace I pre-configured a Non-exclusive queue and now trying to consume data in spring cloud steam app , by providing spring.cloud.stream.bindings.fn-name.consumer.concurrency=4 , In logs i can see it's trying to start four subscriber but all four are getting timed out. If we make this number as 1 or remove this property then it's able to connect and consume data. The reason for starting multiple consumer is to consume that queue data in parallel , so that it would be fast consumption.
On same time if we allow spring cloud to create new queue and bind with four consumer it's able to bind.
Here is my application.yaml
spring:
cloud:
function:
definition: myConsumer
stream:
bindings:
myConsumer-in-0:
destination: test-fu-q1
group: nonexclusive
consumer:
concurrency: 4
#Note if we uncomment below and try with same above settings then it's not working , but if we #change concurrency to 1 or just remove it and uncomment below part then it's working. #solace: #bindings: #myConsumer-in-0: #consumer: #provisionSubscriptionsToDurableQueue: false #provisionDurableQueue: false #queueNamePrefix: "" #useFamiliarityInQueueName: false #useDestinationEncodingInQueueName: false #useGroupNameInQueueName: false #concurrency: 4 , **this value is not being consider**
if i refer Consumer Concurrency point number 3 then it seems concurrency >1 will work only for push based subscription not pull based. and for push based means end point need to be register under solace console to push the message.
Need your help to confirm the behavior or if some thing i am doing wrong.
Comments
-
Hi @manzarul,
Interesting error. Note that you would set
concurrency
how you have it first in your config (notsolace
specific). That said it feels to me like something else is the issue. This exception is weird to me. Any chance your app is running out of memory or anything like that?com.solacesystems.jcsmp.JCSMPTransportException: (JCSMPTransportException) Error receiving data from underlying connection. at com.solacesystems.jcsmp.protocol.impl.TcpClientChannel$ClientChannelReconnect.call(TcpClientChannel.java:2320) ~[sol-jcsmp-10.10.0.jar:na] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) ~[na:na] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) ~[na:na] at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na] Caused by: com.solacesystems.jcsmp.JCSMPTransportException: Timeout reading response to ADCTRL operation. ([BRT resource=test-fu-q1 existingFH=com.solacesystems.jcsmp.impl.flow.FlowHandleImpl@7a6d8366 flowType=CONSUMER counter=12]) at com.solacesystems.jcsmp.impl.flow.BindRequestTask.handleTimeout(BindRequestTask.java:402) ~[sol-jcsmp-10.10.0.jar:na] at com.solacesystems.jcsmp.impl.timers.impl.JCSMPTimerQueueImpl.runAllTo(JCSMPTimerQueueImpl.java:91) ~[sol-jcsmp-10.10.0.jar:na] at com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor.eventLoop(SyncEventDispatcherReactor.java:168) ~[sol-jcsmp-10.10.0.jar:na] at com.solacesystems.jcsmp.protocol.nio.impl.SyncEventDispatcherReactor$SEDReactorThread.run(SyncEventDispatcherReactor.java:338) ~[sol-jcsmp-10.10.0.jar:na] ... 1 common frames omitted
0 -
That is an interesting exception.... transport exception due to an AD control message timeout. And you say this works just fine with concurrency == 1. But not 2. But if you set it to 2, you see 2 subscribers start, but then timeout. On the Solace broker, do you see multiple
CLIENT_CONNECT
log messages (in theevent.log
), or just a single CONNECT, and then multipleBIND
events (i.e. the same connection connecting multiple consumers to the queue?I'm wondering if there is some kind of thread/concurrency issue with multiple (Spring) consumers deadlocking/starving some internal API thread for sending/receiving of heartbeat data.
Any chance you could provide us with full working code/project to test..?
EDIT/UPDATE:
Just looked at the log file in a bit more detail... seems like 1 consumer, and then connecting multiple consumers/flows to the queue. And then 2nd consumer hangs until throwing an exception:
2021-07-19 14:22:17.143 INFO 16568 --- [ main] c.s.j.protocol.impl.TcpClientChannel : Connected to host 'orig=tcps://URL, scheme=tcps://, host=URL, port=55443' (smfclient 1) 2021-07-19 14:22:17.278 INFO 16568 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Caching the binder: solace-cloud 2021-07-19 14:22:17.278 INFO 16568 --- [ main] o.s.c.s.binder.DefaultBinderFactory : Retrieving cached binder: solace-cloud 2021-07-19 14:22:17.360 INFO 16568 --- [ main] c.s.s.c.s.b.p.SolaceQueueProvisioner : Creating durable queue test-fu-q1 for consumer group nonexclusive 2021-07-19 14:22:17.361 INFO 16568 --- [ main] c.s.s.c.s.b.p.SolaceQueueProvisioner : Durable queue provisioning is disabled, test-fu-q1 will not be provisioned nor will its configuration be validated 2021-07-19 14:22:17.361 INFO 16568 --- [ main] c.s.s.c.s.b.p.SolaceQueueProvisioner : Testing consumer flow connection to queue test-fu-q1 (will not start it) 2021-07-19 14:22:17.538 INFO 16568 --- [ main] c.s.s.c.s.b.p.SolaceQueueProvisioner : Connected test consumer flow to queue test-fu-q1, closing it 2021-07-19 14:22:17.542 INFO 16568 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'test-fu-q1.nonexclusive.errors' has 1 subscriber(s). 2021-07-19 14:22:17.543 INFO 16568 --- [ main] o.s.c.stream.binder.BinderErrorChannel : Channel 'test-fu-q1.nonexclusive.errors' has 2 subscriber(s). 2021-07-19 14:22:17.547 INFO 16568 --- [ main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating 4 consumer flows for queue test-fu-q1 <inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84> 2021-07-19 14:22:17.547 INFO 16568 --- [ main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 1 of 4 for inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84 2021-07-19 14:22:17.548 INFO 16568 --- [ main] c.s.s.c.s.b.util.FlowReceiverContainer : Binding flow receiver container a349f688-f81f-48c0-a1a2-1cedd0dabf8e 2021-07-19 14:22:17.618 INFO 16568 --- [ main] c.s.s.c.s.b.i.JCSMPInboundChannelAdapter : Creating consumer 2 of 4 for inbound adapter 6750ea97-7e59-4ea9-90d9-b1d1eb40ae84 2021-07-19 14:22:17.619 INFO 16568 --- [ main] c.s.s.c.s.b.util.FlowReceiverContainer : Binding flow receiver container 8823d65d-c6dc-4a64-bb88-bde0f4cee39c *** 5 seconds later 2021-07-19 14:22:23.221 INFO 16568 --- [1_ReactorThread] c.s.j.p.n.impl.SubscriberMessageReader : (Client name: ITSEHBG-DN0202/16568/#00130001/GIPDvkrS5E Local addr: 10.61.243.253 Local port: 53303 Remote addr: URL Remote port: 55443) - SubscriberMessageReader:read Exception java.io.IOException: An existing connection was forcibly closed by the remote host
All the logs are coming from the
main
thread, but then a lot of the exceptions that follow this are coming from "reactor thread", "connect service", "scheduling-1" (I don't recognize that thread name as Solace). But also everything is at INFO level.Any chance of turning the logging up to DEBUG and re-running?
0 -
Please find the logs as attach
1. output-debug.log (logs capture in debug mode)
2. output-info.log (logs capture in info mode)
3. output-with-concurrency-1.log (when concurrency is set to 1 , this is working as expected)0 -
Hi @manzarul,
Interesting. Neither I nor our support team were able to reproduce this issue. Any chance you can try with our "cloud-stream-sink" samples from here: https://github.com/SolaceSamples/solace-samples-spring and see if that has the same behavior?
That might help us narrow down if it's an environment issue or some issue with the app itself.
Hope that helps!
0