I am able to successfully send and receive topic messages via Spring JMS but the messages are not being received by a normal Topic subscriber.
My jms code for sending and receiving is below:
@Autowired
private JmsTemplate jmsTemplate;
public void run(String... strings) throws Exception {
for( int i = 0 ; i < 100; i++) {
String msg = "Hello World" + i;
jmsTemplate.setPubSubDomain(true);
//jmsTemplate.setdu
logger.info("============= Sending " + msg);
this.jmsTemplate.convertAndSend("abc/topic2", msg);
Thread.sleep(1000);
}
}
}
@Component
static class MessageHandler {
private static final Logger logger = LoggerFactory.getLogger(MessageHandler.class);
@JmsListener(destination = "abc/topic2", containerFactory = "cFactory", concurrency = "2")
public void processMsg(Message<?> msg) {
// Works fine, no issues.
StringBuffer msgAsStr = new StringBuffer("============= Received \nHeaders:");
MessageHeaders hdrs = msg.getHeaders();
msgAsStr.append("\nUUID: "+hdrs.getId());
msgAsStr.append("\nTimestamp: "+hdrs.getTimestamp());
Iterator<String> keyIter = hdrs.keySet().iterator();
while (keyIter.hasNext()) {
String key = keyIter.next();
msgAsStr.append("\n"+key+": "+hdrs.get(key));
}
msgAsStr.append("\nPayload: "+msg.getPayload());
logger.info(msgAsStr.toString());
}
public CachingConnectionFactory producerCachingConnectionFactory() throws Exception {
CachingConnectionFactory ccf = new CachingConnectionFactory(connectionFactory());
ccf.setSessionCacheSize(1);
return ccf;
}
@Bean
public JmsTemplate jmsTemplate() throws Exception {
// CachingConnectionFactory ccf = new CachingConnectionFactory(cf);
JmsTemplate jsmTemplate = new JmsTemplate(producerCachingConnectionFactory() );
jsmTemplate.setPubSubDomain(true);
jsmTemplate.setDeliveryPersistent(false); // tried both true and false.
//jsmTemplate.setSessionTransacted(true); // I get an exception ... isnt supported with Direct transport.
return jsmTemplate;
}
@Bean
public ConnectionFactory connectionFactory() throws Exception {
Hashtable<String, Object> env = new Hashtable<>();
env.put(InitialContext.PROVIDER_URL, jndiProviderURL);
env.put(Context.SECURITY_PRINCIPAL, username);
env.put(Context.SECURITY_CREDENTIALS, password);
System.setProperty("java.security.krb5.conf", KRB5_LOC);
System.setProperty("java.security.auth.login.config", JAAS_LOC);
env.put(SupportedProperty.SOLACE_JMS_AUTHENTICATION_SCHEME,
SupportedProperty.AUTHENTICATION_SCHEME_GSS_KRB);
env.put(SupportedProperty.SOLACE_JMS_KRB_SERVICE_NAME, "HOST");
env.put(SupportedProperty.SOLACE_JMS_VPN, vpn);
env.put(SupportedProperty.SOLACE_JMS_DYNAMIC_DURABLES, false);
env.put(SupportedProperty.SOLACE_JMS_DELIVER_TO_ONE_OVERRIDE , false);
SolConnectionFactory cf = SolJmsUtility.createConnectionFactory(env );
return cf;
}
@Bean
public DefaultJmsListenerContainerFactory cFactory(ConnectionFactory connectionFactory, DemoErrorHandler errorHandler) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setErrorHandler(errorHandler);
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(false);
return factory;
}
I wonder if you can spot some obvious issue here please?
Many Thanks,
Ali