How to achieve Topic + Queue mechanism using NodeJS & AMQP

Options
jayMehta
jayMehta Member Posts: 4
edited May 4 in PubSub+ Event Broker #1

Hello,

I'm using NodeJS as my tech stack and created a Queue and that queue is subscribed to the topic.

Mechanism: Publish a message to the topic and subscribe to the msg from the queue, basically Topic to Queue Mapping.

Query: I want to build the above mechanism using the AMQP protocol with NodeJS.

Tried Approach: I've achieved the same mechanism using solclientjs but it does not use AMQP.

Could you please let me know which way I can do this?

I appreciate any help you can provide.

Comments

  • giri
    giri Member, Administrator, Employee Posts: 108 admin
    Options
  • jayMehta
    jayMehta Member Posts: 4
    Options

    Hi @giri , Thank you for your response.

    I've checked the code of the repo which you've shared.

    It has producer & consumer files that use the amqp10 package, but if you see they are using a queue to send the data.

    If I use the topic instead of a queue to send the data, it is not working.

    So, I don't know if Solace supports AMQP to use topic-to-queue mapping.

    I've created a subscription manually using the GUI interface of Solace.

    Thank you.

  • jayMehta
    jayMehta Member Posts: 4
    Options

    @giri Adding code here..

    I'm using https://www.npmjs.com/package/rhea package here, as https://www.npmjs.com/package/amqp10 is no longer supported and suggests to use of rhea.

    Publish to Topic ( AMQP ) & Subscribe to Queue ( AMQP )

    (1) Publish the data to the Topic using AMQP:

    // Publisher using AMQP
    const rhea = require("rhea");
    const { URL } = require("url");
    
    // Define connection URL
    const connectionString = "amqp://example.com:5672";
    
    // Parse the connection URL
    const parsedUrl = new URL(connectionString);
    
    // Extract hostname and port
    const hostname = parsedUrl.hostname;
    const port = parsedUrl.port;
    
    // Create a new container
    const container = rhea.create_container();
    
    // Connect to the server using the parsed hostname and port
    const connection = container.connect({ host: hostname, port: port });
    
    connection.on("connection_open", function (context) {
      console.log("Connected to server successfully!");
    
      // Create a sender link to send messages to the queue
      const sender = context.connection.open_sender({ target: { address: "my-topic" } });
    
      // Define message content
      const messageContent = "Hello, this is a test message!";
    
      // Create a message object with the content
      const message = {
        body: { content: messageContent }
      };
    
      // Send the message
      sender.send(JSON.stringify(message));
    
      console.log("Message sent:", messageContent);
    });
    
    // Handle errors
    connection.on("error", function (context) {
      console.error("Error:", context.error);
    });
    
    

    (2) Consume the data from the Queue using AMQP:

    // Subscriber using AMQP
    const rhea = require("rhea");
    const { URL } = require("url");
    
    // Define connection URL
    const connectionString = "amqp://example.com:5672";
    
    // Parse the connection URL
    const parsedUrl = new URL(connectionString);
    
    // Extract hostname and port
    const hostname = parsedUrl.hostname;
    const port = parsedUrl.port;
    
    // Create a new container
    const container = rhea.create_container();
    
    // Connect to the server using the parsed hostname and port
    const connection = container.connect({ host: hostname, port: port });
    
    connection.on("connection_open", function (context) {
      console.log("Connected to server successfully!");
    
      // Create a receiver link to receive messages from the queue
      const receiver = context.connection.open_receiver({ source: { address: "my-queue" } });
    
      // Define message handler for received messages
      receiver.on("message", function (context) {
        const contentBuffer = context.message.body.content;
        
        // Convert the content buffer to a string using utf8 encoding
        const messageBody = contentBuffer.toString('utf8');
        
        console.log('Received message:', messageBody);
      });
    });
    
    // Handle errors
    connection.on("error", function (context) {
      console.error("Error:", context.error);
    });
    
    

    Using the above code it connects to Solace successfully but data not produced into topic.

    Now, if I just switch to MQTT as a producer it works fine, below is the code.

    • Publish to Topic using ( MQTT ) & Subscribe to Queue using ( AMQP )
    // Publisher Using MQTT
    const mqtt = require('mqtt');
    
    // MQTT broker connection options
    const mqttBrokerUrl = "mqtt://example.com:1883";
    const mqttClient = mqtt.connect(mqttBrokerUrl);
    
    // Publish a message to a topic
    mqttClient.on('connect', function () {
        console.log('Connected to MQTT broker successfully!');
        mqttClient.publish('my-topic', JSON.stringify({ data: 'Hello, Find me!!'}));
    });
    
    // Handle errors
    mqttClient.on('error', function (error) {
        console.error('Error:', error);
    });
    

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 541 admin
    Options

    Something that might be useful is a utility to just listen to all messages published onto Solace, just to snoop to see where/if the publisher is sending something. You could use the "Try Me!" built into the Solace PubSub+ Manager, or SdkPerf, or the Try Me CLI tool..? Whichever you choose, subscribe to both > and #*/> to snoop all messages going through the broker. Makes debugging these things easier.

    Another thing to test is when you publish your message, use the convention topic://<topicName>. It might be possible the publisher API thinks you're trying to publish to a queue called "my-topic".

  • Aaron
    Aaron Member, Administrator, Moderator, Employee Posts: 541 admin
    edited May 7 #6
    Options

    Yup, did some testing… confirmed! Just cutting-pasting the "hello world" sample here: https://www.npmjs.com/package/rhea and having an SdkPerf subscriber listening to everything, I can see this:

    $ ./sdkperf_java.sh -cip=0 -stl=">,#*/>" -md -q
    
    ^^^^^^^^^^^^^^^^^^ Start Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    Destination:                            Queue 'examples'
    Class Of Service:                       USER_COS_1
    DeliveryMode:                           DIRECT
    Message Id:                             7
    Ack Immediately
    DMQ Eligible
    Binary Attachment:                      len=18
    1f 00 00 00 12 48 65 6c    6c 6f 20 57 6f 72 6c 64    .....Hello.World
    21 00                                                 !.
    ^^^^^^^^^^^^^^^^^ End Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    

    So the publisher doing this: connection.open_sender('examples'); is sending to the queue called examples. If I modify the code to say topic://examples/multi/level/topic, then my SdkPerf listener successfully sees the message published to the topic:

    ^^^^^^^^^^^^^^^^^^ Start Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    Destination:                            Topic 'examples/multi/level/topic'
    Class Of Service:                       USER_COS_1
    DeliveryMode:                           DIRECT
    Message Id:                             8
    Ack Immediately
    DMQ Eligible
    Binary Attachment:                      len=18
    1f 00 00 00 12 48 65 6c    6c 6f 20 57 6f 72 6c 64    .....Hello.World
    21 00                                                 !.
    ^^^^^^^^^^^^^^^^^ End Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
    

    Hope that helps!

  • jayMehta
    jayMehta Member Posts: 4
    Options

    Hi @Aaron Thank you for your response! appreciate it..

    Yes, I tried as you mentioned to use topic:// prefix while sending a message and it works well :)

    Do you think that this is a good approach to use Solace and its features with AMQP, or should I go with the Solace default protocol and its lib https://www.npmjs.com/package/solclientjs ?

    Thank you!