🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
Message throttling from a solace queue
Hi,
I am running 2 instances of my applications with 3 message consumers, each consuming messages from the same queue, using solace-spring-cloud-bom version 1.1.1. ie total 6 consumers 3 from each instance.
My issue is that when I publish large number of messages to this queue, the messages are not equally distributing to these 2 instances. Most of the messages are consumed by the 3 consumers from a particular instance. I have another pre-dined set of TaskExecutor thread pool to process each of these messages after consuming them. Since most of the messages are coming to this instance, it is delaying the process from an over all perspective of total number of messages. Is there any way I can control it so that if the TaskExecutors are busy, do not accept any more messages in the queue consumer so that the messages will go to the other instance of the same application.
Any suggestion is highly appreciated.
Thanks
Comments
-
@smpapas, you state "Most of the messages". Just confirming that there are messages getting to the 2nd instance of the application. Can you confirm that the Queue setting for Access Type is set to Non-Exclusive (Exclusive is set by default). Also confirm that you have 6 client connections from the web admin console.
1 -
I can confirm that there are 6 client connections from admin console.
Also messages are received to both instances but the number of messages received in the instance one is much more than the one received by the second instance. It is like 90%:10%.
I should not get any more messages unless I finished the processing of the current list of messages which my process is correctly working on.0 -
Hi @smpapas 👋
Are you using the Spring Cloud Stream Binder? or other Solace-Spring capabilities? If you are using the binder can you share your binding configuration? If other can you share where you are setting up your session/consumers?Also as @RobO mentioned, you should verify that the "Access Type" of your queue is "Non-Exclusive" just to be sure.
0 -
Access type is "Non-exclusine". I am using solace spring stream binder. Here are the configs
spring:
cloud:
stream:
function:
definition: processMsg
bindings:
processMsg-in-0:
destination: my.group
group: msg.events.dev2consumer: concurrency: 3 maxAttempts: 100 backOffInitialInterval: 20000 backOffMaxInterval: 120000
0 -
Hi @smpapas ,
Interesting that you're seeing this. Both myself & @RobO tried to duplicate this behavior but it seems to properly round robin the messages across all 6 consumers for us (concurrency of 3 for 2 instances). Can you verify that you're usingsolace-spring-cloud-bom
v1.1.1?I basically just told my function to sleep for a bit and in PS+ Manager I can even see that unacked messages are being evenly distributed across the flows as well.
0 -
Hi @smpapas,
Sure, to test this out I just wrote a simple consumer to simulate the business logic taking some processing time:@Bean public Consumer<String> uppercase() { return v -> { System.out.println("Received a msg on thread: " + Thread.currentThread().toString()); try { Thread.sleep(5000); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } }; }
0 -
I was doing the same way earlier. My entire business logic(including db queries, other logic, etc.)also was done by the same consumer thread, but after running it in prod, once in a while this consumer thread got stuck for 5 minutes or 10 minutes in between the business logic processing. I thought it is something related with this consumer thread to do all these business logic. Now i am planning to use another thread from a predefined set of thread pool (TaskExecutor) to do the business logic. Here I am passing the message from consumer thread to the thread from the ThreadPoolTaskExecutor to do the business logic. In this case the consumer thread from this instance is always getting much more messages than the other instance. I am looking for a logic to control the consumer to accept message only when we have available thread to do the work from the threadpool
0 -
@smpapas You can scale the ThreadPoolTaskExecutor to ensure you have an appropriate amount of threads to handle the incoming message rate. You can define a publish-subscribe channel that uses the thread pool and has a queue capacity. If that queue capacity is reached, the consumer thread will receive an exception attempting to put the message into that pubsub channel (from Spring Integration). You can issue a sleep or other logic to delay delivery of that message until the ThreadPool can catch up. The rejectedExecutionHandler is a handler for such logic. I simply tried to put the message back onto the channel for processing.
Here is an example of what I've done in the past and has worked for me.
<int:publish-subscribe-channel id="msg.service" task-executor="taskExecutor"/> <beans:bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor"> <beans:property name="corePoolSize" value="1" /> <beans:property name="maxPoolSize" value="${service.max-thread-count}" /> <beans:property name="queueCapacity" value="100" /> <beans:property name="rejectedExecutionHandler" ref="blockCallerExecutionPolicy" /> </beans:bean>
1 -
If I remember correctly, the executor is the same thread as the consumer thread. You may want to verify that.
public class BlockCallerExecutionPolicy implements RejectedExecutionHandler { private static final Logger logger = LoggerFactory.getLogger(BlockCallerExecutionPolicy.class); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { logger.info("Putting message back on queue for processing."); executor.getQueue().put(r); } catch (InterruptedException e) { throw new RejectedExecutionException("Unexpected InterruptedException while waiting to add Runnable to ThreadPoolExecutor queue...", e); } } }
1 -
nice suggestions @RobO!
@smpapas, Just to toss another variable in the fire we are adding client acknowledgement support to the solace cloud stream binder in the next release. This is under dev now and should be released late Feb. So depending on your time frame you might be able to leverage that to keep some of this simpler. It would allow you to handoff X messages to X threads for processing and then ACK the message from those threads.
More info here in the docs being updated for the next release.
0 -
Hi @smpapas, it will be released next week
If you want to try it out early you can checkout the stage-2.0.0 branch. Obviously it may break as changes are being made, but in general you can build and install that locally to use it.
0