Try PubSub+

Message throttling from a solace queue

smpapassmpapas Member Posts: 11

Hi,
I am running 2 instances of my applications with 3 message consumers, each consuming messages from the same queue, using solace-spring-cloud-bom version 1.1.1. ie total 6 consumers 3 from each instance.
My issue is that when I publish large number of messages to this queue, the messages are not equally distributing to these 2 instances. Most of the messages are consumed by the 3 consumers from a particular instance. I have another pre-dined set of TaskExecutor thread pool to process each of these messages after consuming them. Since most of the messages are coming to this instance, it is delaying the process from an over all perspective of total number of messages. Is there any way I can control it so that if the TaskExecutors are busy, do not accept any more messages in the queue consumer so that the messages will go to the other instance of the same application.

Any suggestion is highly appreciated.
Thanks

Comments

  • RobORobO Member, Employee Posts: 7 Solace Employee

    @smpapas, you state "Most of the messages". Just confirming that there are messages getting to the 2nd instance of the application. Can you confirm that the Queue setting for Access Type is set to Non-Exclusive (Exclusive is set by default). Also confirm that you have 6 client connections from the web admin console.

  • smpapassmpapas Member Posts: 11

    I can confirm that there are 6 client connections from admin console.
    Also messages are received to both instances but the number of messages received in the instance one is much more than the one received by the second instance. It is like 90%:10%.
    I should not get any more messages unless I finished the processing of the current list of messages which my process is correctly working on.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 298 admin

    Hi @smpapas 👋
    Are you using the Spring Cloud Stream Binder? or other Solace-Spring capabilities? If you are using the binder can you share your binding configuration? If other can you share where you are setting up your session/consumers?

    Also as @RobO mentioned, you should verify that the "Access Type" of your queue is "Non-Exclusive" just to be sure.

  • smpapassmpapas Member Posts: 11

    Access type is "Non-exclusine". I am using solace spring stream binder. Here are the configs
    spring:
    cloud:
    stream:
    function:
    definition: processMsg
    bindings:
    processMsg-in-0:
    destination: my.group
    group: msg.events.dev2

          consumer:
            concurrency: 3
            maxAttempts: 100
            backOffInitialInterval: 20000
            backOffMaxInterval: 120000
    
  • marcmarc Member, Administrator, Moderator, Employee Posts: 298 admin
    edited January 18

    Hi @smpapas ,
    Interesting that you're seeing this. Both myself & @RobO tried to duplicate this behavior but it seems to properly round robin the messages across all 6 consumers for us (concurrency of 3 for 2 instances). Can you verify that you're using solace-spring-cloud-bom v1.1.1?

    I basically just told my function to sleep for a bit and in PS+ Manager I can even see that unacked messages are being evenly distributed across the flows as well.

  • smpapassmpapas Member Posts: 11

    Thanks @marc
    "I basically just told my function to sleep for a bit and in PS+ Manage" - Can you pls explain what exactly you have done here./ There is no sleep in my code.

  • marcmarc Member, Administrator, Moderator, Employee Posts: 298 admin

    Hi @smpapas,
    Sure, to test this out I just wrote a simple consumer to simulate the business logic taking some processing time:

        @Bean
        public Consumer<String> uppercase() {
            return v -> {
                System.out.println("Received a msg on thread: " + Thread.currentThread().toString());
                try {
                    Thread.sleep(5000);
                } catch (InterruptedException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            };
        }
    
  • smpapassmpapas Member Posts: 11

    I was doing the same way earlier. My entire business logic(including db queries, other logic, etc.)also was done by the same consumer thread, but after running it in prod, once in a while this consumer thread got stuck for 5 minutes or 10 minutes in between the business logic processing. I thought it is something related with this consumer thread to do all these business logic. Now i am planning to use another thread from a predefined set of thread pool (TaskExecutor) to do the business logic. Here I am passing the message from consumer thread to the thread from the ThreadPoolTaskExecutor to do the business logic. In this case the consumer thread from this instance is always getting much more messages than the other instance. I am looking for a logic to control the consumer to accept message only when we have available thread to do the work from the threadpool

  • RobORobO Member, Employee Posts: 7 Solace Employee
    edited January 18

    @smpapas You can scale the ThreadPoolTaskExecutor to ensure you have an appropriate amount of threads to handle the incoming message rate. You can define a publish-subscribe channel that uses the thread pool and has a queue capacity. If that queue capacity is reached, the consumer thread will receive an exception attempting to put the message into that pubsub channel (from Spring Integration). You can issue a sleep or other logic to delay delivery of that message until the ThreadPool can catch up. The rejectedExecutionHandler is a handler for such logic. I simply tried to put the message back onto the channel for processing.

    Here is an example of what I've done in the past and has worked for me.

       <int:publish-subscribe-channel id="msg.service" task-executor="taskExecutor"/>
       <beans:bean id="taskExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
            <beans:property name="corePoolSize" value="1" />
            <beans:property name="maxPoolSize" value="${service.max-thread-count}" />
            <beans:property name="queueCapacity" value="100" />
            <beans:property name="rejectedExecutionHandler" ref="blockCallerExecutionPolicy" />
        </beans:bean>
    
  • smpapassmpapas Member Posts: 11

    Thank you Robo, that is what I am trying to do, 'I simply tried to put the message back onto the channel for processing." - How to put the message back into channel? I am not able to find the code to do this task

  • RobORobO Member, Employee Posts: 7 Solace Employee

    If I remember correctly, the executor is the same thread as the consumer thread. You may want to verify that.

    public class BlockCallerExecutionPolicy implements RejectedExecutionHandler {
        private static final Logger logger = LoggerFactory.getLogger(BlockCallerExecutionPolicy.class);
    
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                logger.info("Putting message back on queue for processing.");
                executor.getQueue().put(r);
            }
            catch (InterruptedException e) {
                throw new RejectedExecutionException("Unexpected InterruptedException while waiting to add Runnable to ThreadPoolExecutor queue...", e);
            }
        }
    }
    
  • marcmarc Member, Administrator, Moderator, Employee Posts: 298 admin
    edited January 18

    nice suggestions @RobO!

    @smpapas, Just to toss another variable in the fire we are adding client acknowledgement support to the solace cloud stream binder in the next release. This is under dev now and should be released late Feb. So depending on your time frame you might be able to leverage that to keep some of this simpler. It would allow you to handoff X messages to X threads for processing and then ACK the message from those threads.

    More info here in the docs being updated for the next release.

  • smpapassmpapas Member Posts: 11

    That is great news @Marc.
    @RobO for your suggestion and is working as expected now.

  • smpapassmpapas Member Posts: 11

    @marc when are you planning to release the next version which supports " client acknowledgement support"?

  • marcmarc Member, Administrator, Moderator, Employee Posts: 298 admin

    Hi @smpapas, it will be released next week :)

    If you want to try it out early you can checkout the stage-2.0.0 branch. Obviously it may break as changes are being made, but in general you can build and install that locally to use it.

Sign In or Register to comment.