I am a test engineer and currently doing some testing on the Solace PubSub+ event broker software to make sure it fits my company’s needs. I am trying to prove that persistent (guaranteed) messages are properly working. The scenario I am testing is either the Solace broker software going down or the message VPNs. If the messages on the client IP (Producer side) are being sent as Persistent, how can I prove on the client side (assuming Solace is down or I don’t have access to it) that these undelivered messages are still being stored?
Here is a code excerpt from the SolaceProducerTask.java class that sends messages to the broker IP:
`
while(System.currentTimeMillis() <= elapsedDuration) {
msg = JCSMPFactory.onlyInstance().createMessage(TextMessage.class);
msg.setDeliveryMode(DeliveryMode.PERSISTENT);
msgId++;
try {
String text=null;
// threadId|msgId|publishDateString|publishTimeMillis|PublishTimeNano|guid|AdditonalPayload
text = "" + this.scenarioId + "_" + threadId + "|" +
msgId + "|" +
LocalDateTime.now().toString() + "|" +
System.currentTimeMillis() + "|" +
System.nanoTime() + "|" +
MessageData.getData(payloadSizeKB);
msg.setText(text);
long sendStartMillis = System.currentTimeMillis();
long sendStartNano = System.nanoTime();
producer.send(msg, destination);
long sendEndMillis = System.currentTimeMillis();
long sendEndNano = System.nanoTime();
// threadId|msgId|publishDateString|publishTimeMillis|PublishTimeNano|guid|PublishLatencyMillis|PublishLatencyNano
//log.info(text.substring(0, text.indexOf("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")) + (sendEndEpoch-sendStartEpoch));
logsQueue.offer(text.substring(0, text.indexOf("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA")) + (sendEndMillis-sendStartMillis) + "|" + (sendEndNano - sendStartNano) + "|" + msg.getMessageId());
} catch (JCSMPException e) {
String errorString = "" + this.scenarioId + "_" + threadId + "|" +
msgId + "|" +
LocalDateTime.now().toString() + "|" +
System.currentTimeMillis() + "|" +
System.nanoTime() + "|" +
MessageData.getGUID() + "|0|0|" +
e.getMessage() + " - " + e.getCause();
//log.info(errorString);
logsQueue.offer(errorString);
run = false;
if (producer != null) {
producer.close();
producer = null;
log.info(LocalDateTime.now().toString() + ":[PRODUCER_CLOSED]");
}
if (session != null) {
session.closeSession();
session = null;
log.info(LocalDateTime.now().toString() + ":[SESSION_CLOSED]");
}
}
try {
Thread.sleep(intervalMillis);
} catch (InterruptedException e) {
log.info(e.getCause() + "\n" + e.getMessage());
}
}
}
}
`