Team, I am new to the Solace and i am currently using in the Solace SpringBoot JMS.
I have created a Queue and Published 500 Message to it. I want to implement a Multithreading behavior to my consumer which will consume the message parallelly and process instead of one by one.
I have added the minimum concurrency as 5 and maximum 10. When i start the application , i can see 5 consumers are in active but only one consumer is processing all the Messages. I want to know how we can use all the consumers and process the messages parlelly.
Sample Code:
@JmsListener(destination = "Q/tab/Consumer/test", concurrency = "5-10" )
public void handle(Message message) {
counter = counter+1;
Date receiveTime = new Date();
System.out.println(
"Consumer Process started at Received at " + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(receiveTime));
if (message instanceof TextMessage) {
TextMessage tm = (TextMessage) message;
try {
tm.getText();
System.out.println(Thread.currentThread().getId());
System.out.println("Processed ----" + counter + " so far" );
} catch (JMSException e) {
e.printStackTrace();
}
} else {
System.out.println(message.toString());
}
}
Welcome to the Solace Community!
Can you check your queue access type under Settings?
You’d want to use a “non-exclusive” queue to spread the load across multiple consumers. Exclusive would deliver all the messages to the first consumer to connect and treat the others as secondary, tertiary, etc.
@RobO ,My concurrency attribute set to 5, In this scenario it is creating 5 consumers on my queue. Is it possible to have a single consumer with multiple threads ?
Check out the @Async annotation. The single consumer thread can invoke a method that has the annotation of @Async with a defined Task Executor (defines the number of threads). Each call to the method should be handled in a new thread and the consumer thread should not block waiting for a return. In other words, the consumer thread should consume the next message and hand it off to the next thread, etc, etc, etc. A simple example of an async method is here.
A more complex way to accomplish similar capability is to use the Spring Integration framework. You can create a channel (in-memory queue) that the messages can be put onto and then threads can pull from that channel and process. It still uses the TaskExecutor feature from Spring to accomplish this.