🎄 Happy Holidays! 🥳
Most of Solace is closed December 24–January 1 so our employees can spend time with their families. We will re-open Thursday, January 2, 2024. Please expect slower response times during this period and open a support ticket for anything needing immediate assistance.
Happy Holidays!
Please note: most of Solace is closed December 25–January 2, and will re-open Tuesday, January 3, 2023.
Sending large amount of guaranteed messages with nodejs
Hi,
we want to send a large amount of guaranteed messages with nodejs.
Our code is similar to https://github.com/SolaceSamples/solace-samples-nodejs/blob/master/src/basic-samples/ConfirmedPublish.js
but without limited number of messages (we do not want to send 10 messages, but much more).
After sending a few messages, we got:Guaranteed Message Window Closed
Any ideas what we need to fix ?
I just assume, the Solace API will take care of the publish window. Or do we have to take notes of ack'ed messages vs published messages ?
Uli
Comments
-
Could we see the actual code?
This blog may help too... https://solace.com/blog/understanding-guaranteed-message-publish-window-sizes-and-acknowledgement/
1 -
Greetings! So the you are correct that the JavaScript API does require a slight amount of manual management when it comes to invoking send calls and tracking published vs acknowledged messages. The SDK does not directly handle all of these aspects in order to prevent certain negative side effects such as internal memory leaks and buffer overruns.
However, the good news is with some wrapper and helper utilities we can write some quite modern JavaScript around the Solace API which makes window and Ack management significantly easier.
Awaiting Individual Message Send Results
The first step to tackling this problem is to simplify message sending by creating a promise that resolves or rejects on each message acknowledgement or rejection, respectively. This pattern is easily accomplished by attaching a correlation key to each individual message and invoking the correct promise handler based on the triggered session events. Assume you have an existing connected Solace session and a message object:
const sendMessage = (session, message) => new Promise((resolve, reject) => { const correlationKey = Symbol(); message.setCorrelationKey(correlationKey); let onAck, onNak; const invoke = (handler) => (evt) => { if (evt.correlationKey !== correlationKey) { return; } session.removeListener(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, onAck); session.removeListener(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, onNak); handler(evt); }; onAck = invoke(resolve); onNak = invoke(reject); session.on(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, onAck); session.on(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, onNak); try { session.send(message); } catch (error) { reject(error); } });
Now, for simple use cases it is possible to perform one-by-one message sending as follows:
// given connected 'session': for(let n = 0; n < 1000; n++) { let message = createMessage(n); // defined elsewhere await sendMessage(session, message); // will wait until message is Acked }
Awaitable Semaphores
To increase throughput it is more performant to send several messages at once, up to an internal limit known as the publish window size. Attempting create too many in-flight messages results in an error, so we need a mechanism to easily track the number of in-progress, asynchronous sends - and to stop execution until in-progress tasks are complete. Many languages have threading primitives known as Semaphores which serve this purpose, and we can write a simple JavaScript implementation as follows:
const createSemaphore = (n) => { const awaiters = []; const maxCount = n; let count = 0; const acquire = () => { count++; return (count <= maxCount) ? Promise.resolve() : new Promise((resolve, reject) => awaiters.push({resolve, reject})); }; const release = () => { count--; const {resolve} = awaiters.shift() || {}; resolve && resolve(); }; const close = () => { awaiters.forEach(({reject}) => reject()); awaiters.splice(0); }; return { acquire, release, close }; };
Note this code and subsequent samples are written in functional style, but could also be written as ECMA classes
This simple utility creates a semaphore with an awaitable
acquire()
function which returns immediately if there are resources available, or otherwise returns an unresolved promise until the appropriate number ofrelease()
calls have been invoked. For example:const semaphore = createSemaphore(2); await semaphore.acquire(); // returns immediately await semaphore.acquire(); // returns immediately await semaphore.acquire(); // blocks until next time semaphore.release() is called
Creating a Multi-Message Async Publisher
Next we can combine these two concepts to create a two-stage message publisher: first a
publish(…)
call which will execute immediately if there are not too many messages in-flight, but otherwise will block until enough messages have been acknowledged. Once the promise resolves, we have an object which provides a function to check on the status of the individual message outcome and determine if they were acknowledged or rejected.Note this is similar to how the
fetch()
API works in Javascript, in that the initial promise resolves to return an object with fucntion properties.json()
and.text()
which must also be awaitedThis is accomplished within the publish call by first acquiring the semaphore before beginning execution, and once the async send result has completed, it releases the semaphore exactly once. As before, assuming we have an already connected
session
instance which was configured with some given windowSize:const createPublisher = (session, windowSize) => { const semaphore = createSemaphore(windowSize); session.setMaxListeners(windowSize); const publish = (message) => semaphore.acquire().then(() => { const sendResult = sendMessage(session, message); sendResult.finally(semaphore.release); return { result: () => sendResult }; }); return { publish }; }
Putting it altogether now, we can efficiently and rapidly send messages in Solace without explicitly monitoring of publish window limits by simply:
// given connected 'session' configured with 'windowSize': const publisher = createPublisher(session, windowSize); const acks = []; for(let n = 0; n < 1000; n++) { let message = createMessage(n); // defined elsewhere let ack = await publisher.publish(message); // this call will never overrun window size acks.push(ack.result()); // lets capture the results for now… } await Promise.all(acks); // All messages have been sent and Acked!
2