Sending large amount of guaranteed messages with nodejs

uherbst
uherbst Member, Employee Posts: 127 Solace Employee

Hi,

we want to send a large amount of guaranteed messages with nodejs.

Our code is similar to https://github.com/SolaceSamples/solace-samples-nodejs/blob/master/src/basic-samples/ConfirmedPublish.js

but without limited number of messages (we do not want to send 10 messages, but much more).

After sending a few messages, we got:
Guaranteed Message Window Closed 

Any ideas what we need to fix ?
I just assume, the Solace API will take care of the publish window. Or do we have to take notes of ack'ed messages vs published messages ?

Uli

Tagged:

Comments

  • amackenzie
    amackenzie Member, Employee Posts: 262 Solace Employee

    Could we see the actual code?

    This blog may help too... https://solace.com/blog/understanding-guaranteed-message-publish-window-sizes-and-acknowledgement/

  • nicholasdgoodman
    nicholasdgoodman Member, Employee Posts: 43 Solace Employee
    edited October 2023 #3

    Greetings! So the you are correct that the JavaScript API does require a slight amount of manual management when it comes to invoking send calls and tracking published vs acknowledged messages. The SDK does not directly handle all of these aspects in order to prevent certain negative side effects such as internal memory leaks and buffer overruns.

    However, the good news is with some wrapper and helper utilities we can write some quite modern JavaScript around the Solace API which makes window and Ack management significantly easier.

    Awaiting Individual Message Send Results

    The first step to tackling this problem is to simplify message sending by creating a promise that resolves or rejects on each message acknowledgement or rejection, respectively. This pattern is easily accomplished by attaching a correlation key to each individual message and invoking the correct promise handler based on the triggered session events. Assume you have an existing connected Solace session and a message object:

    const sendMessage = (session, message) => new Promise((resolve, reject) => {
      const correlationKey = Symbol();
      message.setCorrelationKey(correlationKey);
        
      let onAck, onNak;
      const invoke = (handler) => (evt) => {
        if (evt.correlationKey !== correlationKey) {
          return;    
        }
        session.removeListener(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, onAck);
        session.removeListener(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, onNak);
        handler(evt);
      };
      onAck = invoke(resolve);
      onNak = invoke(reject);
        
      session.on(solace.SessionEventCode.ACKNOWLEDGED_MESSAGE, onAck);
      session.on(solace.SessionEventCode.REJECTED_MESSAGE_ERROR, onNak);
    
      try {
        session.send(message);
      } catch (error) {
        reject(error);
      }
    });
    

    Now, for simple use cases it is possible to perform one-by-one message sending as follows:

    // given connected 'session':
    for(let n = 0; n < 1000; n++) {
      let message = createMessage(n);      // defined elsewhere
      await sendMessage(session, message); // will wait until message is Acked
    }
    

    Awaitable Semaphores

    To increase throughput it is more performant to send several messages at once, up to an internal limit known as the publish window size. Attempting create too many in-flight messages results in an error, so we need a mechanism to easily track the number of in-progress, asynchronous sends - and to stop execution until in-progress tasks are complete. Many languages have threading primitives known as Semaphores which serve this purpose, and we can write a simple JavaScript implementation as follows:

    const createSemaphore = (n) => {
      const awaiters = [];
      const maxCount = n;
      
      let count = 0;
      const acquire = () => {
        count++;
        return (count <= maxCount) ?
          Promise.resolve() :
          new Promise((resolve, reject) => awaiters.push({resolve, reject}));
      };
      const release = () => {
        count--;
        const {resolve} = awaiters.shift() || {};
        resolve && resolve();
      };
      const close = () => {
        awaiters.forEach(({reject}) => reject());
        awaiters.splice(0);
      };
      return { acquire, release, close };
    };
    

    Note this code and subsequent samples are written in functional style, but could also be written as ECMA classes

    This simple utility creates a semaphore with an awaitable acquire() function which returns immediately if there are resources available, or otherwise returns an unresolved promise until the appropriate number of release() calls have been invoked. For example:

    const semaphore = createSemaphore(2);
    await semaphore.acquire();  // returns immediately
    await semaphore.acquire();  // returns immediately
    await semaphore.acquire();  // blocks until next time semaphore.release() is called
    

    Creating a Multi-Message Async Publisher

    Next we can combine these two concepts to create a two-stage message publisher: first a publish(…) call which will execute immediately if there are not too many messages in-flight, but otherwise will block until enough messages have been acknowledged. Once the promise resolves, we have an object which provides a function to check on the status of the individual message outcome and determine if they were acknowledged or rejected.

    Note this is similar to how the fetch() API works in Javascript, in that the initial promise resolves to return an object with fucntion properties .json() and .text() which must also be awaited

    This is accomplished within the publish call by first acquiring the semaphore before beginning execution, and once the async send result has completed, it releases the semaphore exactly once. As before, assuming we have an already connected session instance which was configured with some given windowSize:

    const createPublisher = (session, windowSize) => {
      const semaphore = createSemaphore(windowSize);
      session.setMaxListeners(windowSize);
        
      const publish = (message) => semaphore.acquire().then(() => {        
        const sendResult = sendMessage(session, message);
        sendResult.finally(semaphore.release);
            
        return {
          result: () => sendResult   
        };
      });
        
      return {
        publish
      };
    }
    

    Putting it altogether now, we can efficiently and rapidly send messages in Solace without explicitly monitoring of publish window limits by simply:

    // given connected 'session' configured with 'windowSize':
    
    const publisher = createPublisher(session, windowSize);
    const acks = [];
    for(let n = 0; n < 1000; n++) {
      let message = createMessage(n);             // defined elsewhere
      let ack = await publisher.publish(message); // this call will never overrun window size
      acks.push(ack.result());                    // lets capture the results for now…
    }
    await Promise.all(acks);
    // All messages have been sent and Acked!