synchronizing onComplete event for a solcache subscribe request

alinaqvi
alinaqvi Member Posts: 35

Happy new year all,
I want to find a way to synchronize the onComplete callback for a solcache sub request.

cacheSession.sendCacheRequest(reqId, solaceTopic, true, CacheLiveDataAction.FLOW_THRU, new CacheRequestListener() {

            @Override
            public void onException(Long arg0, Topic arg1, JCSMPException arg2) {

                LOG.error("Consumer Received exception for solace-cache-topic:" + arg2);

            }

            @Override
            public void onComplete(Long cacheReqId, Topic topic, CacheRequestResult reqResult) {

               // onComplete is called async and there could still be messages coming via other threads when this is invoked. 
                handleCacheReqOnComplete(cacheReqId, topic, reqResult, listener);

            }


        });

At the moment inside the handleCacheReqOnComplete method we startup a new thread which sleeps for a second for any messages still in transit to come through then we do the cache request complete logic. The thing is a second is an arbitrary number and if we have a lot of messages then it might not be large enough.

Is there a way to find out how many messages will be sent to us as part of initial cache subscribe when we call cacheSession.sendCacheRequest ? We can then do a counter on messages received then do oncomplete logic.

Thanks in advance.

Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 636 admin

    Hi alinaqvi! If you're a Solace customer, you should probably directly email Solace Support for an official answer.

    That said, the comment in the code block above does not appear in the Solace documentation. It is true that onComplete() is run asynchronously (i.e. via callback) at the end of a cache response, however the messages from the cache are ordered, and the API would receive the "end of response" message after all the cached messages. And there are not parallel threads within the Java JCSMP API that are dispatching messages... there is one thread (reactor) that reads message from the socket and places them on an internal notification queue, and then another thread (dispatcher) that sends them into the application via callback. It should be the same thread that is running the XMLMessageListener.onReceive() method as the CacheRequestListener.onComplete() method. So the order should be maintained... you'd only get the onComplete() after all the cached messages have been sent to the API. You shouldn't need to wait for 1 second. That is unless, you are taking messages out of the onReceive() callback and sending them to other worker threads to be processed..?

    If not, have you seen or can you prove that cache response messages arrive on the message callback AFTER the onComplete() is run? Obviously, you could be receiving Live Data messages, since you are adding a subscription as part of your cache request. (the true in the method signature). Do you check if messages are "cached" or "live" messages inside the onReceive()? (XMLMessage.isCacheMessage() ?)

    Let us know. Thanks!

  • alinaqvi
    alinaqvi Member Posts: 35

    @Aaron said:
    Hi alinaqvi! If you're a Solace customer, you should probably directly email Solace Support for an official answer.

    That said, the comment in the code block above does not appear in the Solace documentation. It is true that onComplete() is run asynchronously (i.e. via callback) at the end of a cache response, however the messages from the cache are ordered, and the API would receive the "end of response" message after all the cached messages. And there are not parallel threads within the Java JCSMP API that are dispatching messages... there is one thread (reactor) that reads message from the socket and places them on an internal notification queue, and then another thread (dispatcher) that sends them into the application via callback. It should be the same thread that is running the XMLMessageListener.onReceive() method as the CacheRequestListener.onComplete() method. So the order should be maintained... you'd only get the onComplete() after all the cached messages have been sent to the API. You shouldn't need to wait for 1 second. That is unless, you are taking messages out of the onReceive() callback and sending them to other worker threads to be processed..?

    If not, have you seen or can you prove that cache response messages arrive on the message callback AFTER the onComplete() is run? Obviously, you could be receiving Live Data messages, since you are adding a subscription as part of your cache request. (the true in the method signature). Do you check if messages are "cached" or "live" messages inside the onReceive()? (XMLMessage.isCacheMessage() ?)

    Let us know. Thanks!

    Hi Aaron,

    Yes we are a solace customer. And we commonly observe onComplete being called by JCSMP API async and receive messages after its called.

    Thanks,
    Ali

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 636 admin

    Hey Ali... and you are sure these are cache response messages (XMLMessage.isCacheMessage() == true), and not normal "live" messages from your subscription?? If so, I don't think that's correct behaviour, but I'm not 100% sure. Please escalate to Solace Support for further investigation... the Solace Community forum is a best-effort basis for casual users... Support doesn't watch this space. Thanks!

  • alinaqvi
    alinaqvi Member Posts: 35

    Sorry forgot about this but wanted to give a quick update incase it helps others. Aaron above is right. All our msgs are being received by the same thread( Context_1_ConsumerDispatcher) . The syncing issues come from our app logic.

  • hong
    hong Guest Posts: 480 ✭✭✭✭✭

    Hi @alinaqvi, if Aaron's comment answers your question, can you accept it as the best answer for the benefit of the community?