How to achieve Topic + Queue mechanism using NodeJS & AMQP
Hello,
I'm using NodeJS as my tech stack and created a Queue and that queue is subscribed to the topic.
Mechanism: Publish a message to the topic and subscribe to the msg from the queue, basically Topic to Queue Mapping.
Query: I want to build the above mechanism using the AMQP protocol with NodeJS.
Tried Approach: I've achieved the same mechanism using solclientjs but it does not use AMQP.
Could you please let me know which way I can do this?
I appreciate any help you can provide.
Comments
-
Hi @giri , Thank you for your response.
I've checked the code of the repo which you've shared.
It has producer & consumer files that use the amqp10 package, but if you see they are using a queue to send the data.
If I use the topic instead of a queue to send the data, it is not working.
So, I don't know if Solace supports AMQP to use topic-to-queue mapping.
I've created a subscription manually using the GUI interface of Solace.
Thank you.
0 -
@giri Adding code here..
I'm using
package here, as is no longer supported and suggests to use of rhea.Publish to Topic ( AMQP ) & Subscribe to Queue ( AMQP )
(1) Publish the data to the Topic using AMQP:// Publisher using AMQP const rhea = require("rhea"); const { URL } = require("url"); // Define connection URL const connectionString = "amqp://example.com:5672"; // Parse the connection URL const parsedUrl = new URL(connectionString); // Extract hostname and port const hostname = parsedUrl.hostname; const port = parsedUrl.port; // Create a new container const container = rhea.create_container(); // Connect to the server using the parsed hostname and port const connection = container.connect({ host: hostname, port: port }); connection.on("connection_open", function (context) { console.log("Connected to server successfully!"); // Create a sender link to send messages to the queue const sender = context.connection.open_sender({ target: { address: "my-topic" } }); // Define message content const messageContent = "Hello, this is a test message!"; // Create a message object with the content const message = { body: { content: messageContent } }; // Send the message sender.send(JSON.stringify(message)); console.log("Message sent:", messageContent); }); // Handle errors connection.on("error", function (context) { console.error("Error:", context.error); });
(2) Consume the data from the Queue using AMQP:
// Subscriber using AMQP const rhea = require("rhea"); const { URL } = require("url"); // Define connection URL const connectionString = "amqp://example.com:5672"; // Parse the connection URL const parsedUrl = new URL(connectionString); // Extract hostname and port const hostname = parsedUrl.hostname; const port = parsedUrl.port; // Create a new container const container = rhea.create_container(); // Connect to the server using the parsed hostname and port const connection = container.connect({ host: hostname, port: port }); connection.on("connection_open", function (context) { console.log("Connected to server successfully!"); // Create a receiver link to receive messages from the queue const receiver = context.connection.open_receiver({ source: { address: "my-queue" } }); // Define message handler for received messages receiver.on("message", function (context) { const contentBuffer = context.message.body.content; // Convert the content buffer to a string using utf8 encoding const messageBody = contentBuffer.toString('utf8'); console.log('Received message:', messageBody); }); }); // Handle errors connection.on("error", function (context) { console.error("Error:", context.error); });
Using the above code it connects to Solace successfully but data not produced into topic.
Now, if I just switch to MQTT as a producer it works fine, below is the code.
- Publish to Topic using ( MQTT ) & Subscribe to Queue using ( AMQP )
// Publisher Using MQTT const mqtt = require('mqtt'); // MQTT broker connection options const mqttBrokerUrl = "mqtt://example.com:1883"; const mqttClient = mqtt.connect(mqttBrokerUrl); // Publish a message to a topic mqttClient.on('connect', function () { console.log('Connected to MQTT broker successfully!'); mqttClient.publish('my-topic', JSON.stringify({ data: 'Hello, Find me!!'})); }); // Handle errors mqttClient.on('error', function (error) { console.error('Error:', error); });
0 -
Something that might be useful is a utility to just listen to all messages published onto Solace, just to snoop to see where/if the publisher is sending something. You could use the "Try Me!" built into the Solace PubSub+ Manager, or SdkPerf, or the Try Me CLI tool..? Whichever you choose, subscribe to both
>
and#*/>
to snoop all messages going through the broker. Makes debugging these things easier.Another thing to test is when you publish your message, use the convention
topic://<topicName>
. It might be possible the publisher API thinks you're trying to publish to a queue called "my-topic".0 -
Yup, did some testing… confirmed! Just cutting-pasting the "hello world" sample here: https://www.npmjs.com/package/rhea and having an SdkPerf subscriber listening to everything, I can see this:
$ ./sdkperf_java.sh -cip=0 -stl=">,#*/>" -md -q ^^^^^^^^^^^^^^^^^^ Start Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^ Destination: Queue 'examples' Class Of Service: USER_COS_1 DeliveryMode: DIRECT Message Id: 7 Ack Immediately DMQ Eligible Binary Attachment: len=18 1f 00 00 00 12 48 65 6c 6c 6f 20 57 6f 72 6c 64 .....Hello.World 21 00 !. ^^^^^^^^^^^^^^^^^ End Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
So the publisher doing this:
connection.open_sender('examples');
is sending to the queue called examples. If I modify the code to saytopic://examples/multi/level/topic
, then my SdkPerf listener successfully sees the message published to the topic:^^^^^^^^^^^^^^^^^^ Start Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^ Destination: Topic 'examples/multi/level/topic' Class Of Service: USER_COS_1 DeliveryMode: DIRECT Message Id: 8 Ack Immediately DMQ Eligible Binary Attachment: len=18 1f 00 00 00 12 48 65 6c 6c 6f 20 57 6f 72 6c 64 .....Hello.World 21 00 !. ^^^^^^^^^^^^^^^^^ End Message ^^^^^^^^^^^^^^^^^^^^^^^^^^^
Hope that helps!
0 -
Hi @Aaron Thank you for your response! appreciate it..
Yes, I tried as you mentioned to use
topic://
prefix while sending a message and it works well :)Do you think that this is a good approach to use Solace and its features with AMQP, or should I go with the Solace default protocol and its lib
?Thank you!
0