When a message is published to the incorrect topic or encounters a permission issue, no exception is raised. Although JCSMPStreamingPublishCorrelatingEventHandler has been implemented, the handleErrorEx and responseReceivedEx methods are not being invoked, making it difficult to track missing messages.
import java.io.IOException;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.ibsplc.iloyal.plugins.projectspecific.config.EventsConfiguration;
import com.ibsplc.iloyal.plugins.projectspecific.dto.ExternalConfigurationDTO;
import com.ibsplc.iloyal.plugins.projectspecific.dto.JMSHeaderDTO;
import com.ibsplc.xibase.util.log.Log;
import com.ibsplc.xibase.util.log.factory.LogFactory;
import com.solacesystems.jcsmp.BytesXMLMessage;
import com.solacesystems.jcsmp.DeliveryMode;
import com.solacesystems.jcsmp.InvalidPropertiesException;
import com.solacesystems.jcsmp.JCSMPException;
import com.solacesystems.jcsmp.JCSMPFactory;
import com.solacesystems.jcsmp.JCSMPProperties;
import com.solacesystems.jcsmp.JCSMPSession;
import com.solacesystems.jcsmp.JCSMPStreamingPublishCorrelatingEventHandler;
import com.solacesystems.jcsmp.JCSMPTransportException;
import com.solacesystems.jcsmp.SDTException;
import com.solacesystems.jcsmp.SDTMap;
import com.solacesystems.jcsmp.TextMessage;
import com.solacesystems.jcsmp.Topic;
import com.solacesystems.jcsmp.XMLMessageProducer;
public class EventsConfirmedPublish {
private static final Log LOGGER = LogFactory.getLogger(EventsConfirmedPublish.class.getName());
private EventsConfiguration eventsConfiguration;
final int count = 5;
final CountDownLatch latch = new CountDownLatch(count); // used for Synchronizing b/w threads
/**
* A correlation structure. This structure is passed back to the publisher
* callback when the message is acknowledged or rejected
*/
class MsgInfo {
public volatile boolean acked = false;
public volatile boolean publishedSuccessfully = false;
public BytesXMLMessage sessionIndependentMsg = null;
public final long id;
public MsgInfo(long id) {
this.id = id;
LOGGER.log(Log.FINE, "[FRM] MsgInfo acked from constructor - {}", acked);
}
@Override
public String toString() {
return String.format("Message ID: %d, Published Confirmation: %b, Published Successfully: %b", id, acked,
publishedSuccessfully);
}
}
class PubCallback implements JCSMPStreamingPublishCorrelatingEventHandler {
@Override
public void handleErrorEx(Object arg0, JCSMPException arg1, long arg2) {
if (arg0 instanceof MsgInfo) {
MsgInfo i = (MsgInfo) arg0;
i.acked = true;
LOGGER.log(Log.SEVERE, "Message response (rejected) received for {}, error was {} \n", i, arg1 );
} else {
LOGGER.log(Log.FINE, "Received error but correlation object is not of type MsgInfo");
}
latch.countDown();
}
@Override
public void responseReceivedEx(Object arg0) {
if (arg0 instanceof MsgInfo) {
MsgInfo i = (MsgInfo) arg0;
i.acked = true;
i.publishedSuccessfully = true;
LOGGER.log(Log.FINE, "Message response (accepted) received for {} \n", i);
} else {
LOGGER.log(Log.FINE, "Received response but correlation object is not of type MsgInfo");
}
latch.countDown();
}
}
public void sendMessage(JMSHeaderDTO jmsHeaderDTO, String outGoingMsg)
throws JCSMPTransportException, JCSMPException, IOException {
ExternalConfigurationDTO externalConfigs = eventsConfiguration.getConfigurations();
final LinkedList<MsgInfo> msgList = new LinkedList<MsgInfo>();
JCSMPSession session = null;
try {
LOGGER.log(Log.FINE, "ConfirmedPublish initializing...");
// Create a JCSMP Session
final JCSMPProperties properties = new JCSMPProperties();
properties.setProperty(JCSMPProperties.USERNAME, externalConfigs.getSolaceConfigs().getSolaceUsername());
properties.setProperty(JCSMPProperties.PASSWORD, externalConfigs.getSolaceConfigs().getSolacePassword());
properties.setProperty(JCSMPProperties.HOST, externalConfigs.getSolaceConfigs().getSolaceHost());
properties.setProperty(JCSMPProperties.VPN_NAME, externalConfigs.getSolaceConfigs().getSolaceVpn());
session = JCSMPFactory.onlyInstance().createSession(properties);
session.connect();
LOGGER.log(Log.FINE, "JCSMP session connected");
final Topic solaceTopic = JCSMPFactory.onlyInstance().createTopic(jmsHeaderDTO.getDestination());
final XMLMessageProducer producer = session.getMessageProducer(new PubCallback());
LOGGER.log(Log.FINE, "About to send message to Topic - {}", solaceTopic.getName());
TextMessage message = createTextMessage(jmsHeaderDTO, outGoingMsg, externalConfigs);
// The application will wait and confirm the message is published
// successfully.
// In this case, wrap the message in a MsgInfo instance, and
// use it as a correlation key.
for (int i=1; i<=count; i++) {
final MsgInfo msgCorrelationInfo = new MsgInfo(count);
msgCorrelationInfo.sessionIndependentMsg = message;
msgList.add(msgCorrelationInfo);
// Set the message's correlation key. This reference
// is used when calling back to responseReceivedEx().
message.setCorrelationKey(msgCorrelationInfo);
// **Important** Set delivery mode to PERSISTENT for guaranteed messaging
message.setDeliveryMode(DeliveryMode.PERSISTENT);
message.setAckImmediately(true);
LOGGER.log(Log.FINE, "Message - count {} - {}", i, message.toString());
producer.send(message, solaceTopic);
}
LOGGER.log(Log.FINE, "Message Send. Processing replies");
try {
latch.await();
} catch (InterruptedException ex) {
LOGGER.log(Log.SEVERE, "[FRM] I was awoken while awaiting");
}
//Process the replies
while (msgList.peek() != null) {
final MsgInfo ackedMsgInfo = msgList.poll();
LOGGER.log(Log.FINE, "Removing acknowledged message {} from application list.\n", ackedMsgInfo);
}
LOGGER.log(Log.FINE, "Message successfully published to Solace topic : {}", solaceTopic);
}
catch (InvalidPropertiesException ex) {
LOGGER.log(Log.SEVERE, "InvalidPropertiesException occured while connecting to JCSMPSession session. {}",
ex.getMessage());
throw new InvalidPropertiesException(
"InvalidPropertiesException occured while connecting to JCSMPSession session. {}", ex);
} catch (JCSMPException ex) {
LOGGER.log(Log.SEVERE, "JCSMPException occured while publishing event data. {}", ex.getMessage());
throw new JCSMPException("JCSMPException occured while publishing event data. {}", ex);
} finally {
if (session != null) {
session.closeSession();
}
}
}
private TextMessage createTextMessage(JMSHeaderDTO jmsHeaderDTO, String outGoingMsg,
ExternalConfigurationDTO externalConfigs) throws SDTException {
TextMessage message = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
// Set message headers
SDTMap map = JCSMPFactory.onlyInstance().createMap();
map.putString(externalConfigs.getJmsHeaderKeys().getJmsCorrelation(), jmsHeaderDTO.getCorrelationID());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsContentType(), jmsHeaderDTO.getDataContentType());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsDestination(), jmsHeaderDTO.getDestination());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsEvent(), jmsHeaderDTO.getEvent());
map.putString(externalConfigs.getJmsHeaderKeys().getJmsEventId(), jmsHeaderDTO.getEventId());
message.setProperties(map);
message.setText(outGoingMsg);
message.setHTTPContentType(jmsHeaderDTO.getDataContentType());
return message;
}
}
Please advise