Solace Callback Message Operation Track - NodeJs +TypeScript
We are trying to build the condition/decision based on Solace Return message and provide the output to the endpoint JSON message as an API output. I tried to with multiple different options but unable to control the CallBack event message.
Could you please guide/share me if there any kind of knowledge base/sample github codebase present.
Programming Language: Node.JS with TypeScript. Code Deployed: Azure Function
Used this code during implementation: https://github.com/SolaceSamples/solace-samples-nodejs
Best Answer
-
There are two things wrong with the
SendMessageToSolaceTEST
function you have provided.First, you declare a local variable
connectStatus
which is used in your return statement at the bottom, but the Promise callbacks you have supplied are setting the value ofthis.connectStatus
which means the local variable is never assigned any value other than""
.Secondly, you are mix and matching async-await coding with Promise-based patterns. It would be better if the function works with one pattern or the other. As-written, you are not waiting for the
solaceClient.connect()
call to complete. Rather, the connect is triggered, and the function will return immediately.Using async-await style, this function should look more like:
const SendMessageToSolaceTEST = async function (): Promise<string> { this.solaceClient = new SolaceClient(); let connectStatus; try { await this.solaceClient.Connect(); // wait for the connection to finish connectStatus = "Connected to Solace!"; // assign the local variable, not the class-level one } catch (error) { connectStatus = `${error}!`; } return "sibendu" + connectStatus; }
I have not yet looked at the SolaceClient.ts file.
1
Answers
-
Hi @Sibendu - The interaction is asynchronous, hence a registered callback on the consumer will be invoked in general.
I would recommend checking out the NodeJS samples for topic-based guaranteed publisher & subscriber here.
https://github.com/SolaceSamples/solace-samples-javascript/tree/master/src/basic-samples
For the same functionality on a queue based activity
https://github.com/SolaceSamples/solace-samples-javascript/tree/master/src/features
Hope that helps - let us know.
1 -
Thank you @Giri for the details.
As I mentioned in my post, we are using Azure Function(node js with typescript) with HTTP trigger. There is no such consumer to Call event trigger manually as Azure function provide and only provide the REST API endpoint.
I saw some article on this and its perfectly match on browser application where event trigger manually and its works.
Problem Statement:
Solace return the message as call back event not Method Return Type. All the method they have in void return type. So. There is no option to track this call event listener in Azure Function.
Node Js azure function can not wait for this callback event and executed on his own way(return response details ) and after that callback event executed. We need some kind of message return type and based on that we can make some decision.
I have gone through the Solace C# libraries which have a functionality to method return type and based on that we can some decision(C# Code URL). We need similar kind of functionality in node.js which is not present.
As per requirement, we want to check every step status message(like connection issue, Message Delivery, Failure Message delivery etc) and based on that we will provide the Error or success message in Rest API output.
Thank you in advance!
0 -
@Sibendu I think, I understand what you are trying to do. Unfortunately, the NodeJS (JS) libraries are built based on asynchronous invocation i.e., a callback based approach (unlike C# example that you also pointed out).
A blocking call is not available in the NodeJS API.
Can you look into Azure Function Consumer/Producer - Integration Hub.
Also, there is a codelab on how to use the consumer - Stream data from Azure Service Bus to Solace PubSub+ Event broker Using Azure Function
please check it out.
0 -
Hi, Not using nodejs - the other approach is to directly do a REST based calls that can get you the final result success (200) or other errors (including timeout). I hope other members may have ideas, I will circle back if I hear other options.
0 -
Hi @Sibendu - can you explain what you are looking to achieve a little bit more? Are you looking to publish a message within the azure function or subscribe to a message and exit once a message is received? Also are you looking for a request/response pattern over Solace? Ultimately you will need to wrap a callback in a promise to achieve what you are looking for.
0 -
Thank you for your response.
I am trying to send a message to topic from Azure Function. The azure function is developing using node.js and type script.
We want to send message to different different topic based on condition and we want to execute codebase for each decision.
Let take an example, lets assume that I have passed my incorrect usename and password. During Connect to solace it should return me error or success. If fails, it will return message to end point as API response. if Success, Then only I will proceed to to next steps to Send message to topic.
During Send, if any issue occurred like Invalid topic name, message not received, success or any configuration issue in solace. It should be return to end point as API response.
This api will be used as high performance API and can't wait for long running response time.
Please let know if you need any other information.
0 -
@Sibendu - If the Azure Function is meant to receive an HTTP message and then publish it to the broker, have you considered using the built-in REST messaging feature? A Solace broker can act as a REST endpoint and automatically receive a message from a POST request. Failures caused will result in the HTTP request responding with an error code, as you describe.
If you need more advanced capabilities than what the built-in feature allows (such as custom validation or data transformation), you can leverage the fact that Node.js Azure functions are async by default.
Although the Solace API uses callbacks, these can be wrapped as Promises, and then used from within your HTTP handler. For example, let's say you want to connect and send asynchronously:(Example Code - not tested)
const { app } = require('@azure/functions'); app.http('helloWorld1', { route: 'hello/world', handler: async (request, context) => { // create session // const session = ... // const message = ... // let result = ... try { const connectSessionEvent = await sessionConnect(session); const publishResult = await publishMessage(session, message); // restult = { status: 'OK' }; or similar... } catch (err) { // do something // result = { status: 'FAILED', reason: '...' }; or similar... } return result; } }); // Connect function sessionConnect(session) { return new Promise((resolve, reject) => { session.on(solace.SessionEventCode.UP_NOTICE, resolve); session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, reject); // -- more events can be subscribed to... try { session.connect(); } catch (err) { reject(err); } }); } // Publish function publishMessage(session, message) { return new Promise((resolve, reject) => { const correlationKey = Symbol(); message.setCorrelationKey(correlationKey); session.once(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, resolve); session.once(solace.SessionEventCode.REJECED_MESSAGE_ERROR, reject); session.send(message); }); }
0 -
@nicholasdgoodman Thank you for response and details level explanation.
I have already tried with Promise and Resolve option and try to connect. I followed this code (GIT HUB) and tried to built the functionality. But problem is if Solace connection failed, it not throwing exception and error message comes from callback listener but azure function can't wait for response from call listener and its return his own response type.
I put 5 sec sleep for getting the data from callback listener but still not work.
Let me try with once more with your example.
0 -
You may also want to look into adjusting the default settings of the Solace connection properties. In many cases, the Solace JavaScript client will attempt to connect and/or reconnect to a broker several times (with built-in delays) before it determines the connection has failed and raising the connection failed error.
You could look at setting the
connectRetries
andreconnectRetries
settings 1 or 0 so that if the Azure Function cannot connect to the broker on the first try, it replies to the HTTP requestor with an error.0 -
Yes. you are correct. I tried with 1 retries and 500 ms but not getting desire result..
Here is the code I tried.
async Connect():Promise<any> {
return new Promise<any>((resolve, reject) => {
console.log(this.session);
if (this.session !== null && this.session !== undefined) {
this.Log("Already connected and ready to subscribe.");
reject();
}
// if there's no session, create one with the properties imported from the game-config file
try {
if (solaceConf.HostName.indexOf("tcps") != 0) {
reject("HostUrl must be the TCPS Endpoint that begins with either tcps://. Please check your Host Url!");
} const sessionProperties = new solace.SessionProperties({
url: `${solaceConf.HostName}:${solaceConf.PortNumber}`,
vpnName: `${solaceConf.VPNName}`,
userName: `${solaceConf.UserName}`,
password: `${solaceConf.Password}`,
publisherProperties: {
acknowledgeMode: solace.MessagePublisherAcknowledgeMode.PER_MESSAGE,
},
connectTimeoutInMsecs: 500,
connectRetries: 1
});
this.session = solace.SolclientFactory.createSession(sessionProperties);
} catch (error) {
this.Log(error.toString());
throw error;
}
// define session event listeners
//The UP_NOTICE dictates whether the session has been established
this.session.on(solace.SessionEventCode.UP_NOTICE, sessionEvent => {
this.Log("=== Successfully connected and ready to subscribe. ===");
resolve("Success");
});
//The CONNECT_FAILED_ERROR implies a connection failure
this.session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, sessionEvent => {
this.Log("Connection failed to the message router: " + sessionEvent.message + " - check correct parameter values and connectivity!");
if (this.errorMessageReceivedCallback) {
this.errorMessageReceivedCallback(sessionEvent.message);
}
reject(`failed`);
});
//DISCONNECTED implies the client was disconnected
this.session.on(solace.SessionEventCode.DISCONNECTED, sessionEvent => {
this.Log("Disconnected.");
if (this.session !== null) {
this.session.dispose();
//this.subscribed = false;
this.session = null;
}
});
//ACKNOWLEDGED MESSAGE implies that the broker has confirmed message receipt
this.session.on(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, sessionEvent => {
this.Log("Delivery of message with correlation key = " + sessionEvent.correlationKey + " confirmed.");
});
//REJECTED_MESSAGE implies that the broker has rejected the message
this.session.on(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, sessionEvent => {
this.Log("Delivery of message with correlation key = " + sessionEvent.subcode + " rejected, info: " + sessionEvent.message);
});
//SUBSCRIPTION ERROR implies that there was an error in subscribing on a topic
this.session.on(solace.SessionEventCode.SUBSCRIPTION_ERROR, sessionEvent => {
this.Log("Cannot subscribe to topic: " + sessionEvent.subcode);
//remote the topic from the TopicSubscriptionMap
//this.topicSubscriptions.delete(sessionEvent.correlationKey);
});
//SUBSCRIPTION_OK implies that a subscription was succesfully applied/removed from the broker
this.session.on(solace.SessionEventCode.SUBSCRIPTION_OK, sessionEvent => {
this.Log(`Session co-relation-key for event: ${sessionEvent.correlationKey}`);
var key = (sessionEvent.correlationKey) as CorrelationKey;
//Check if the topic exists in the map
if (this.topicSubscriptions.get(key.Key)) {
//If the subscription shows as subscribed, then this is a callback for unsubscripition
if (this.topicSubscriptions.get(key.Key).isSubscribed) {
//Remove the topic from the map
this.topicSubscriptions.delete(key.Key);
this.Log(`Successfully unsubscribed from topic: ${sessionEvent.correlationKey}`);
} else {
//Otherwise, this is a callback for subscribing
this.topicSubscriptions.get(key.Key).isSubscribed = true;
this.Log(`Successfully subscribed to topic: ${sessionEvent.correlationKey}`);
}
}
});
//Message callback function
this.session.on(solace.SessionEventCode.MESSAGE, message => {
//Get the topic name from the message's destination
let topicName: string = message.getDestination().getName();
//Iterate over all subscriptions in the subscription map
for (let sub of Array.from(this.topicSubscriptions.keys())) {
//Replace all * in the topic filter with a .* to make it regex compatible
let regexdSub = sub.replace(/\*/g, ".*");
//if the last character is a '>', replace it with a .* to make it regex compatible
if (sub.lastIndexOf(">") == sub.length - 1) regexdSub = regexdSub.substring(0, regexdSub.length - 1).concat(".*");
let matched = topicName.match(regexdSub);
//if the matched index starts at 0, then the topic is a match with the topic filter
if (matched && matched.index == 0) {
//Edge case if the pattern is a match but the last character is a *
if (regexdSub.lastIndexOf("*") == sub.length - 1) {
//Check if the number of topic sections are equal
if (regexdSub.split("/").length != topicName.split("/").length) return;
}
//Proceed with the message callback for the topic subscription if the subscription is active
if (this.topicSubscriptions.get(sub).isSubscribed && this.topicSubscriptions.get(sub).callback != null) console.log(`Got callback for ${sub}`);
this.topicSubscriptions.get(sub).callback(message);
}
}
});
// connect the session
try {
var connect = this.session.connect();
console.log("TEST COnnect"+connect);
resolve("success");
} catch (error) {
this.Log("error"+error.toString());
reject(`failed`);
throw error;
}
});}
0 -
One thing that stands out is that you should not be resolving the promise at the end of the
async Connect
function: the only place theresolve(..)
should be invoked is in theUP_NOTICE
event handler.As written, the connect function will resolve immediately (and synchronously) and any code that comes after it will likewise execute before the connection is ready.
await Connect(); // this is currently going to resolve immediately // this line will execute before the connection is established //most other session operations will likely fail
The fix is to simply remove the unnecessary call at the bottom:
// connect the session try { var connect = this.session.connect(); console.log("TEST COnnect" + connect); // resolve("success"); <--- Do NOT resolve here } catch (error) { this.Log("error" + error.toString()); reject(`failed`); throw error; }
0 -
Yes. I tried but not received desired result. Async not wait for Listener to resolve. Not tracked the result for connectivity and its sent it as undefined.
Solace Client.ts
async Connect():Promise<any> {
return new Promise<any>((resolve, reject) => {
console.log(this.session);
if (this.session !== null && this.session !== undefined) {
this.Log("Already connected and ready to subscribe Sibendu.");
reject();
}
// if there's no session, create one with the properties imported from the game-config file
try {
if (solaceConf.HostName.indexOf("tcps") != 0) {
reject("HostUrl must be the TCPS Endpoint that begins with either tcps://. Please check your Host Url!");
} const sessionProperties = new solace.SessionProperties({
url: `${solaceConf.HostName}:${solaceConf.PortNumber}`,
vpnName: `${solaceConf.VPNName}`,
userName: `${solaceConf.UserName}`,
password: `${solaceConf.Password}`,
publisherProperties: {
acknowledgeMode: solace.MessagePublisherAcknowledgeMode.PER_MESSAGE,
},
connectTimeoutInMsecs: 500,
connectRetries: 1
});
this.session = solace.SolclientFactory.createSession(sessionProperties);
} catch (error) {
this.Log(error.toString());
throw error;
}
// define session event listeners
//The UP_NOTICE dictates whether the session has been established
this.session.on(solace.SessionEventCode.UP_NOTICE, sessionEvent => {
this.Log("=== Successfully connected and ready to subscribe. ===");
resolve("Success");
});
//The CONNECT_FAILED_ERROR implies a connection failure
this.session.on(solace.SessionEventCode.CONNECT_FAILED_ERROR, sessionEvent => {
this.Log("Connection failed to the message router: " + sessionEvent.message + " - check correct parameter values and connectivity!"); reject(`failed`);
});
//DISCONNECTED implies the client was disconnected
this.session.on(solace.SessionEventCode.DISCONNECTED, sessionEvent => {
this.Log("Disconnected.");
if (this.session !== null) {
this.session.dispose();
//this.subscribed = false;
this.session = null;
}
});
//ACKNOWLEDGED MESSAGE implies that the broker has confirmed message receipt
this.session.on(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, sessionEvent => {
this.Log("Delivery of message with correlation key = " + sessionEvent.correlationKey + " confirmed.");
});
//REJECTED_MESSAGE implies that the broker has rejected the message
this.session.on(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, sessionEvent => {
this.Log("Delivery of message with correlation key = " + sessionEvent.subcode + " rejected, info: " + sessionEvent.message);
});
//SUBSCRIPTION ERROR implies that there was an error in subscribing on a topic
this.session.on(solace.SessionEventCode.SUBSCRIPTION_ERROR, sessionEvent => {
this.Log("Cannot subscribe to topic: " + sessionEvent.subcode);
//remote the topic from the TopicSubscriptionMap
//this.topicSubscriptions.delete(sessionEvent.correlationKey);
});
//SUBSCRIPTION_OK implies that a subscription was succesfully applied/removed from the broker
this.session.on(solace.SessionEventCode.SUBSCRIPTION_OK, sessionEvent => {
this.Log(`Session co-relation-key for event: ${sessionEvent.correlationKey}`);
var key = (sessionEvent.correlationKey) as CorrelationKey;
//Check if the topic exists in the map
if (this.topicSubscriptions.get(key.Key)) {
//If the subscription shows as subscribed, then this is a callback for unsubscripition
if (this.topicSubscriptions.get(key.Key).isSubscribed) {
//Remove the topic from the map
this.topicSubscriptions.delete(key.Key);
this.Log(`Successfully unsubscribed from topic: ${sessionEvent.correlationKey}`);
} else {
//Otherwise, this is a callback for subscribing
this.topicSubscriptions.get(key.Key).isSubscribed = true;
this.Log(`Successfully subscribed to topic: ${sessionEvent.correlationKey}`);
}
}
});
//Message callback function
this.session.on(solace.SessionEventCode.MESSAGE, message => {
//Get the topic name from the message's destination
let topicName: string = message.getDestination().getName();
//Iterate over all subscriptions in the subscription map
for (let sub of Array.from(this.topicSubscriptions.keys())) {
//Replace all * in the topic filter with a .* to make it regex compatible
let regexdSub = sub.replace(/\*/g, ".*");
//if the last character is a '>', replace it with a .* to make it regex compatible
if (sub.lastIndexOf(">") == sub.length - 1) regexdSub = regexdSub.substring(0, regexdSub.length - 1).concat(".*");
let matched = topicName.match(regexdSub);
//if the matched index starts at 0, then the topic is a match with the topic filter
if (matched && matched.index == 0) {
//Edge case if the pattern is a match but the last character is a *
if (regexdSub.lastIndexOf("*") == sub.length - 1) {
//Check if the number of topic sections are equal
if (regexdSub.split("/").length != topicName.split("/").length) return;
}
//Proceed with the message callback for the topic subscription if the subscription is active
if (this.topicSubscriptions.get(sub).isSubscribed && this.topicSubscriptions.get(sub).callback != null) console.log(`Got callback for ${sub}`);
this.topicSubscriptions.get(sub).callback(message);
}
}
});
// connect the session
try {
var connect = this.session.connect();
console.log("TEST COnnect"+connect);
} catch (error) {
this.Log("Test Error"+error.toString());
throw error;
}
}); }Here is YourClass.ts file which is called to SolaceClient.ts
export class YourClass {
private solaceClient:SolaceClient;constructor()
{}
SendMessageToSolaceTEST = async function () : Promise<string> {
this.solaceClient = new SolaceClient();
let connectStatus ="";
this.solaceClient
.Connect()
.then(() => {
this.connectStatus = "Connected to Solace!";
})
.catch(error => {
this.connectStatus = `${error}!`;
});
return "sibendu"+connectStatus;
}
}
Then Azure function called my YourClass.ts file. I am getting undefined runtime and log generate later.
0 -
There are two things wrong with the
SendMessageToSolaceTEST
function you have provided.First, you declare a local variable
connectStatus
which is used in your return statement at the bottom, but the Promise callbacks you have supplied are setting the value ofthis.connectStatus
which means the local variable is never assigned any value other than""
.Secondly, you are mix and matching async-await coding with Promise-based patterns. It would be better if the function works with one pattern or the other. As-written, you are not waiting for the
solaceClient.connect()
call to complete. Rather, the connect is triggered, and the function will return immediately.Using async-await style, this function should look more like:
const SendMessageToSolaceTEST = async function (): Promise<string> { this.solaceClient = new SolaceClient(); let connectStatus; try { await this.solaceClient.Connect(); // wait for the connection to finish connectStatus = "Connected to Solace!"; // assign the local variable, not the class-level one } catch (error) { connectStatus = `${error}!`; } return "sibendu" + connectStatus; }
I have not yet looked at the SolaceClient.ts file.
1 -
Yes. I have able to manage the connectivity with Callback Listener. Thanks for Help @nicholasdgoodman
I need a help for Message Acknowledge details.
I am using MessageDeliveryModeType.PERSISTENT and deliver the message to Solace from API.
Every time I received "Message(s) Acknowledge" message with Correlation Key. even i put unknown topic name. I want to know more details about Acknowledgement message details like scenario.
- If my Topic Name is invalid or not properly setup
- If my Topic is down and unable to Send the consumer
- If any problem in Solace Broker itself during send() message
- if successfully transmit data to consumer
Thanks in advance!
0 -
Hi @Sibendu ,
regarding your questions i think it make sense to have a look into this blog to get to know more about durable enpoints.
and also this is helpful for ack/nackMessageDeliveryModeType.PERSISTENT.
This mode provides once-and-only-once message delivery. A Persistent delivery mode is used for Guaranteed Messaging, and this delivery mode is most appropriate for applications that require persistent storage of the messages they send or intend to receive. Persistent messages have the following characteristics:- They cannot be discarded or lost (once they are acknowledged).
- They cannot be reordered in the event of network topology changes.
- They cannot be delivered more than once to a single client (unless the redelivered message flag is applied).
- When they match subscriptions on durable endpoints, they are retained for a client when that client is not connected.
Persistent messages are most appropriate for applications that require persistent storage of the messages they send or intend to receive. The quality of service offered by Guaranteed Messaging is analogous to JMS Persistent messaging with durable subscriptions.
help pages
hope that helps.
Others may be able to add more details, but I think the questions are difficult to summarise in one answer.
0