About Callback mechanism in the JCSMP

Sathishkumar
Sathishkumar Member Posts: 4
edited October 29 in PubSub+ Event Broker #1

When a message is published to the incorrect topic or encounters a permission issue, no exception is raised. Although JCSMPStreamingPublishCorrelatingEventHandler has been implemented, the handleErrorEx and responseReceivedEx methods are not being invoked, making it difficult to track missing messages.

import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import com.ibsplc.iloyal.plugins.projectspecific.config.EventsConfiguration;
import com.ibsplc.iloyal.plugins.projectspecific.dto.ExternalConfigurationDTO;
import com.ibsplc.iloyal.plugins.projectspecific.dto.JMSHeaderDTO;
import com.ibsplc.xibase.util.log.Log;
import com.ibsplc.xibase.util.log.factory.LogFactory;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.TextMessage;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageProducer;
@
public class EventsConfirmedPublish {

private static final Log LOGGER = LogFactory.getLogger(EventsConfirmedPublish.class.getName());

@ private EventsConfiguration eventsConfiguration;

final int count = 5;
final CountDownLatch latch = new CountDownLatch(count); // used for Synchronizing b/w threads

/**
* A correlation structure. This structure is passed back to the publisher
* callback when the message is acknowledged or rejected
*/
class MsgInfo {
public volatile boolean acked = false;
public volatile boolean publishedSuccessfully = false;
public BytesXMLMessage sessionIndependentMsg = null;
public final long id;

public MsgInfo(long id) {
this.id = id;
LOGGER.log(Log.FINE, "[FRM] MsgInfo acked from constructor - {}", acked);
}

@Override
public String toString() {
return String.format("Message ID: %d, Published Confirmation: %b, Published Successfully: %b", id, acked,
publishedSuccessfully);
}
}

class PubCallback implements JCSMPStreamingPublishCorrelatingEventHandler {

@Override
public void handleErrorEx(Object arg0, JCSMPException arg1, long arg2) {
if (arg0 instanceof MsgInfo) {
MsgInfo i = (MsgInfo) arg0;
i.acked = true;
LOGGER.log(Log.SEVERE, "Message response (rejected) received for {}, error was {} \n", i, arg1 );
} else {
LOGGER.log(Log.FINE, "Received error but correlation object is not of type MsgInfo");
}
latch.countDown();
}

@Override
public void responseReceivedEx(Object arg0) {
if (arg0 instanceof MsgInfo) {
MsgInfo i = (MsgInfo) arg0;
i.acked = true;
i.publishedSuccessfully = true;
LOGGER.log(Log.FINE, "Message response (accepted) received for {} \n", i);
} else {
LOGGER.log(Log.FINE, "Received response but correlation object is not of type MsgInfo");
}
latch.countDown();
}
}

public void sendMessage(JMSHeaderDTO jmsHeaderDTO, String outGoingMsg)
throws JCSMPTransportException, JCSMPException, IOException {

ExternalConfigurationDTO externalConfigs = eventsConfiguration.getConfigurations();
final LinkedList<MsgInfo> msgList = new LinkedList<MsgInfo>();
JCSMPSession session = null;
try {
LOGGER.log(Log.FINE, "ConfirmedPublish initializing...");
// Create a JCSMP Session
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.USERNAME, externalConfigs.getSolaceConfigs().getSolaceUsername());
properties.setProperty(JCSMPProperties.PASSWORD, externalConfigs.getSolaceConfigs().getSolacePassword());
properties.setProperty(JCSMPProperties.HOST, externalConfigs.getSolaceConfigs().getSolaceHost());
properties.setProperty(JCSMPProperties.VPN_NAME, externalConfigs.getSolaceConfigs().getSolaceVpn());
session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
LOGGER.log(Log.FINE, "JCSMP session connected");
final Topic solaceTopic = JCSMPFactory.onlyInstance().createTopic(jmsHeaderDTO.getDestination());
final XMLMessageProducer producer = session.getMessageProducer(new PubCallback());
LOGGER.log(Log.FINE, "About to send message to Topic - {}", solaceTopic.getName());
TextMessage message = createTextMessage(jmsHeaderDTO, outGoingMsg, externalConfigs);
// The application will wait and confirm the message is published
// successfully.
// In this case, wrap the message in a MsgInfo instance, and
// use it as a correlation key.
for (int i=1; i<=count; i++) {
final MsgInfo msgCorrelationInfo = new MsgInfo(count);
msgCorrelationInfo.sessionIndependentMsg = message;
msgList.add(msgCorrelationInfo);

// Set the message's correlation key. This reference
// is used when calling back to responseReceivedEx().
message.setCorrelationKey(msgCorrelationInfo);
// **Important** Set delivery mode to PERSISTENT for guaranteed messaging
message.setDeliveryMode(DeliveryMode.PERSISTENT);
message.setAckImmediately(true);
LOGGER.log(Log.FINE, "Message - count {} - {}", i, message.toString());
producer.send(message, solaceTopic);
}

LOGGER.log(Log.FINE, "Message Send. Processing replies");
try {
latch.await();
} catch (InterruptedException ex) {
LOGGER.log(Log.SEVERE, "[FRM] I was awoken while awaiting");
}
//Process the replies
while (msgList.peek() != null) {
final MsgInfo ackedMsgInfo = msgList.poll();
LOGGER.log(Log.FINE, "Removing acknowledged message {} from application list.\n", ackedMsgInfo);
}
LOGGER.log(Log.FINE, "Message successfully published to Solace topic : {}", solaceTopic);
}
catch (InvalidPropertiesException ex) {
LOGGER.log(Log.SEVERE, "InvalidPropertiesException occured while connecting to JCSMPSession session. {}",
ex.getMessage());
throw new InvalidPropertiesException(
"InvalidPropertiesException occured while connecting to JCSMPSession session. {}", ex);
} catch (JCSMPException ex) {
LOGGER.log(Log.SEVERE, "JCSMPException occured while publishing event data. {}", ex.getMessage());
throw new JCSMPException("JCSMPException occured while publishing event data. {}", ex);
} finally {
if (session != null) {
session.closeSession();
}
}
}

private TextMessage createTextMessage(JMSHeaderDTO jmsHeaderDTO, String outGoingMsg,
ExternalConfigurationDTO externalConfigs) throws SDTException {
TextMessage message = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
// Set message headers
SDTMap map = JCSMPFactory.onlyInstance().createMap();
map.putString(externalConfigs.getJmsHeaderKeys().getJmsCorrelation(), jmsHeaderDTO.getCorrelationID());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsContentType(), jmsHeaderDTO.getDataContentType());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsDestination(), jmsHeaderDTO.getDestination());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsEvent(), jmsHeaderDTO.getEvent());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsEventId(), jmsHeaderDTO.getEventId());
message.setProperties(map);
message.setText(outGoingMsg);
message.setHTTPContentType(jmsHeaderDTO.getDataContentType());
return message;
}

}

Please advise

Answers

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin

    Hi @Sathishkumar, welcome to the Community.

    Please edit your post and reformat using code blocks. Very hard to read.

    Please provide a working sample rather than just dumping a bunch of code from different classes.

    Please test with known working samples, such as Guaranteed Publisher. I can see NACKs for both queue full and publish ACL block:

    ~LOG WARN~ 16:21:48.765 [Context_1_ProducerDispatcher] WARN  com.solac.sampl.jcsmp.patte.GuaranteedPublisher - NACK for Message com.solacesystems.jcsmp.impl.JCSMPGenericXMLMessage[messageId=112625,ackMessageId=0,prevId=0,CID_count=0,userData=,type=PERSISTENT,priority=4,redelivered=false,timeToLive=0,expiration=0,dmqEligible=false,topicSeqNum=null,metadataLen=0,contentLen=0,attLen=512,sendAttemptedOnce=false,ackImmediately=false,safeToRelease=false,retransmitting=false,sendCount=0@2da2c40] - ((Client name: AaronsThinkPad3/5061/aa5a775e83dde6050001/W6biLvjL4N   local(/0:0:0:0:0:0:0:1%0:53052) remote(0.0.0.0/0:0:0:0:0:0:0:1:55555)) - )  com.solacesystems.jcsmp.JCSMPErrorResponseException: 503: Spool Over Quota. Queue or Topic endpoint limit exceeded - Topic 'solace/samples/jcsmp/pers/pub/B' [Subcode:30]
    ~LOG WARN~ 16:22:32.224 [Context_1_ProducerDispatcher] WARN  com.solac.sampl.jcsmp.patte.GuaranteedPublisher - NACK for Message com.solacesystems.jcsmp.impl.JCSMPGenericXMLMessage[messageId=113272,ackMessageId=0,prevId=0,CID_count=0,userData=,type=PERSISTENT,priority=4,redelivered=false,timeToLive=0,expiration=0,dmqEligible=false,topicSeqNum=null,metadataLen=0,contentLen=0,attLen=512,sendAttemptedOnce=false,ackImmediately=false,safeToRelease=false,retransmitting=false,sendCount=0@69959639] - ((Client name: AaronsThinkPad3/5221/920d107a61255ca50001/utqDL244Js   local(/0:0:0:0:0:0:0:1%0:53620) remote(0.0.0.0/0:0:0:0:0:0:0:1:55555)) - )  com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Publish ACL Denied - Topic 'solace/samples/jcsmp/pers/pub/S' [Subcode:28]
    

    Please verify your queue configuration that it's not configured to Silently discard on reject.

    Note that the new Java API is simpler and easier to use, perhaps you should use that instead of JCSMP, esepcially if this is a new project?

  • Sathishkumar
    Sathishkumar Member Posts: 4

    When I update the topic/destination with an incorrect path, I do not receive the expected exception:

    com.solacesystems.jcsmp.JCSMPErrorResponseException: 403: Publish ACL Denied
    

    However, when I update the connection details with incorrect inputs, I do receive a JCSMPException. As you mentioned, I've updated the code format for your reference to get suggestions.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin

    So, how do you mean the topic is an "incorrect path"? By default, you can publish to any topic you want to, unless you configure your ACL profile to disallow it. What is your ACL profile configuration?

    Or, if you want to configure your app so that, "NACK this topic message if nobody is listening on this topic", you need to toggle that in the client profile your app is using:

    In your first post, you say:

    Although JCSMPStreamingPublishCorrelatingEventHandler has been implemented, the handleErrorEx and responseReceivedEx methods are not being invoked, making it difficult to track missing messages.

    Is this still true? That should be impossible if you are actually publishing with DeliveryMode of PERSISTENT.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin

    Wow ok, looking at your code a bit more closely, you are looping 5 times sending the exact same message object 5 times, and using the exact same message object for correlation?? Please don't do that. Create a new message each time you want to publish.

    Also, this sendMessage(JMSHeaderDTO, String) method that you have implemented… it has all the broker connection logic inside this method? Which would imply to me that each time you want to send a single message, you have to establish a connection. This is very inefficient, the Session to the broker should be connected once at the beginning of the application lifetime, and left up for the duration… pass in the connected Session, or keep it as a variable of some long-lived instance that this method can reference.

  • Sathishkumar
    Sathishkumar Member Posts: 4

    About your questions - So, how do you mean the topic is an "incorrect path"? By default, you can publish to any topic you want to, unless you configure your ACL profile to disallow it. What is your ACL profile configuration?

    As a producer, I am sending msg to the customer (will remove looping i.e just of my verification. our case is to send a single message only). The customer (target system) has provisioned the ACL profile configuration, but for one of the topics, they neglected to include the topic details in the ACL. When I publish a msg to this topic, no exception is raised, which led us to investigate the issue. We discovered that the exceptions are not being triggered and none of the methods in the implemented callback mechanism are executing.

    Note: After correcting the ACL profile, we are able to send msgs without any issues. However, the methods in the implemented callback mechanism are still not executing. The only time a JCSMPExecption is raised is when there are issues related to the host, username or pwd.

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin

    Hi @Sathishkumar , I'm not sure why you aren't seeing any ACKs or NACKs being generated within your app… the responseReceivedEx and handleErrorEx methods of the JCSMPStreamingPublishCorrelatingEventHandler. They should be called for every message that you send if the DeliveryMode is set to PERSISTENT. That is number one issue to address. Even if ACL profile is configured correctly and your app allowed to publish, you should still get ACK.

    It might (?) have something to do with you reusing the same message object multiple times? But I'm assuming you see the same behaviour with publishing only 1 message?

    What does your logging show? You have a bunch of LOGGER.log() methods everywhere..?

    Did you try running the JCSMP sample I provided above? I know it works. You could add more verbose logging in the JCSMPStreamingPublishCorrelatingEventHandler class at the bottom to display the ACKs and NACKs on the console.

  • Sathishkumar
    Sathishkumar Member Posts: 4
    edited November 18 #8

    Hi @Aaron ~ I have included the logs here. Could you please take a look?

    Code snippet attached, here.

    No logs were printed under INFO or WARN levels at the time of publishing the message to the invalid topic. However, this does not imply that no exceptions were raised. The process is functioning as expected, except for the callback/acknowledgment. Additionally, no exceptions such as JCSMPException or JCSMPErrorResponseException were raised for invalid topic.

    Thanks

    Sathishkumar A

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 634 admin

    Hey @Sathishkumar , looking at your code, the most obvious thing is that your sendMessage() method exits before waiting to receive the ACK from the broker. You've defined all the connection info, the Session, etc. inside this method… when the method returns after you send, the Session variable loses scope and is torn down… so the ACKs or NACKs will never arrive. Remember that the send() call is asynchronous… the method returns once the message has been written to the socket, not once it has be confirmed by the broker. You have to stick around to wait for the broker response. You could implement a mechanism using concurrency objects like a CyclicBarrier to coordinate between the thread that is publishing and the ACK/NACK callbacks.

    Typically most apps will connect when they start up, and leave the connection up for the lifetime of the application, and send/receive many messages on that Session. Creating a brand new Session for every single messages would be very very slow performance wise due to the complexity of establishing a new TCP session, TLS, then SMF protocol. So: move all your connection out of this method, keep the Session and Producer as instance variables (or static class variables?) and have the sendMessage() method just reference them.