JMS Queue Browsing (Received incoming message with no active flow)

Joe
Joe Member Posts: 2

I am developing a Spring Boot Application (version 2.7.5). Currently, I'm trying to create a queue broswer using the following code:

public int getMessageCount() {
    return jmsTemplate.browse(fromQueueName, new BrowserCallback<Integer>() {
        @Override
        public Integer doInJms(Session s, QueueBrowser qb) throws JMSException {
            return Collections.list(qb.getEnumeration()).size();
        }
    });
}

The jmsTemplate bean is configured as follows:

public class Configuration {
  @Value("${solace.jms.sessionCacheSize}")
  private int sessionCacheSize;

  @Value("${solace.jms.receiveTimeout}")
  private int receiveTimeout

  @Bean
  public ConnectionFactory solConnectionFactory(
        @Value("${solace.jms.broker}") String broker,
        @Value("${solace.jms.port}") Integer port,
        @Value("${solace.jms.msg-vpn}") String vpn,
        @Value("${solace.jms.client-username}") String username,
        @Value("${solace.jms.client-password}") String password
  ) {
    try {
        var solConnectionFactory = SolJmsUtility.createConnectionFactory();
        solConnectionFactory.setHost(broker);
        solConnectionFactory.setPort(port);
        solConnectionFactory.setVPN(vpn);
        solConnectionFactory.setUsername(username);
        solConnectionFactory.setPassword(password);
        solConnectionFactory.setDirectTransport(false);

        return solConnectionFactory;
    } catch (Exception e) {
        // TODO:
        throw new RuntimeException(e);
    }
  }

  @Bean
  public CachingConnectionFactory cachingConnectionFactory(ConnectionFactory connectionFactory) {
    CachingConnectionFactory cachingConnectionFactory = new CachingConnectionFactory(connectionFactory);
    cachingConnectionFactory.setSessionCacheSize(sessionCacheSize);

    return cachingConnectionFactory;
  }

  @Bean
  public JmsTemplate jmsTemplate(CachingConnectionFactory cachingConnectionFactory) {
    JmsTemplate jmsTemplate = new JmsTemplate(cachingConnectionFactory);
    jmsTemplate.setReceiveTimeout(receiveTimeout);
    jmsTemplate.setPubSubDomain(false);

    return jmsTemplate;
  }
}

After setting the log level to debug, I found the following log message (A message with the same id definitely exists in the queue):

2023-01-20 18:00:32.150 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.impl.flow.SubFlowManagerImpl   : Client-1: regActiveFlow id=102
2023-01-20 18:00:32.150 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.impl.flow.SubFlowManagerImpl   : Client-1: addManagedFlow Flowid=102
2023-01-20 18:00:32.150 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: getWindowSize()=255
2023-01-20 18:00:32.151 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowSmfUtil          : tpCreateAck: flow_d: 102, TpMsgId: 0, windowSz: 255
2023-01-20 18:00:32.151 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: tpCreateAck: flow_d: 102, tt_lastInOrderTpMsg: 0, tt_windowSz: 255
2023-01-20 18:00:32.151 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Start flow, force ack>>> flow=102, tp=0, ws=255
2023-01-20 18:00:32.152 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.protocol.impl.SmfUhUtil        : Ignored one or more unknown parameters in SMF header.
2023-01-20 18:00:32.153 DEBUG 1 --- [sumerDispatcher] s.j.p.n.i.ConsumerNotificationDispatcher : Consumer dispatcher thread starts
2023-01-20 18:00:32.153 DEBUG 1 --- [  restartedMain] com.solacesystems.jms.SolQueueBrowser    : Client-1:SubFlow-102: SolQueueBrowser created.  Queue: secm/q/sit/secm/billingkltp/error   messageSelector: null
2023-01-20 18:00:32.235 DEBUG 1 --- [  restartedMain] com.solacesystems.jms.SolQueueBrowser    : Client-1:SubFlow-102: D close().
2023-01-20 18:00:32.235 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: pauseFlowInternally=false
2023-01-20 18:00:32.235 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: _startState=STOPPED
2023-01-20 18:00:32.235 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: Clear AD timer
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: Clear AD timer
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: window size 0 due to startState=STOPPED
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: Flow 102: getWindowSize()=0
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowSmfUtil          : tpCreateAck: flow_d: 102, TpMsgId: 0, windowSz: 0
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.flow.FlowHandleImpl       : Client-1:SubFlow-102: tpCreateAck: flow_d: 102, tt_lastInOrderTpMsg: 0, tt_windowSz: 0
2023-01-20 18:00:32.236 DEBUG 1 --- [  restartedMain] c.s.j.impl.queues.UnackedMessageList2    : UNACKLIST-ack>>> reason=flow-closing flow=102
2023-01-20 18:00:32.238 DEBUG 1 --- [  restartedMain] c.s.j.protocol.impl.TcpClientChannel     : Client-1: sendUnbindRequest (smfclient 1) sendAdCtrlRequest Response 0 for flowId 102
2023-01-20 18:00:32.238 DEBUG 1 --- [  restartedMain] c.s.jcsmp.impl.RequestResponseTask       : RequestResponseTask ([UBRT resource=secm/q/sit/secm/billingkltp/error flowId=102 counter=0]) startTimer 
2023-01-20 18:00:32.244 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.impl.flow.SubFlowManagerImpl   : Client-1: Demux pub msg:com.solacesystems.jcsmp.impl.JCSMPGenericXMLMessage[messageId=138454,ackMessageId=138454,prevId=0,CID_count=0,userData=,type=PERSISTENT,priority=4,redelivered=false,timeToLive=0,expiration=0,dmqEligible=false,topicSeqNum=null,metadataLen=0,contentLen=0,attLen=557,sendAttemptedOnce=false,ackImmediately=false,safeToRelease=false,retransmitting=false,[email protected]]
2023-01-20 18:00:32.244 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.impl.flow.SubFlowManagerImpl   : Client-1: Received incoming message with no active flow found for flowId=102, ignoring.
2023-01-20 18:00:32.244 DEBUG 1 --- [2_ReactorThread] c.s.jcsmp.protocol.impl.SmfUhUtil        : Ignored one or more unknown parameters in SMF header.


I also found the following feature matrix in the documentation:

I'm a bit unsure about whether creating a QueueBrowser is supported using JMS, or if I've misconfigured the queue. I'm also not clear about what a flow is.

Any help would be much appreciated.

Tagged:

Comments

  • murat
    murat Member, Employee Posts: 9 Solace Employee
    edited January 23 #2

    Hi @Joe ,

    Thank you for your question and I can confirm that queue browsing is supported using our JMS API (I will ask for that matrix to be updated). We actually include a sample within our JMS .zip package to demonstrate how to browse a queue's contents called SolJMSQueueBrowser.

    I have attached the sample file to this reply so you can get a feel of how this can be done.


    The full Solace JMS API zip can be downloaded using this link here:https://solace.com/downloads/

    Cheers,

    MV

  • Joe
    Joe Member Posts: 2

    Hi Murat,

    it is finally working. As per this SO question, I need to start the application with the JVM system property:

    https://stackoverflow.com/questions/51316721

    -DSolace_JMS_Browser_Timeout_In_MS=1000
    

    This is a bit unexpected. Is there a way to do this programmatically?

  • murat
    murat Member, Employee Posts: 9 Solace Employee

    Hi @Joe ,

    I've been told that this property can either be set in the JVM as mentioned in that stackoverflow ticket or it can be set as a JMS initial context variable. Please try it out and let me know if you still can't get it to work.

    Cheers,

    MV