My issue is quite weird, let me first explain what I am trying to achieve here. My goal is to have two services, both built in node js. First service is a web server built with express js to work as an API with the client. The second service is a data processing service that receives the request from the web server and processes the data.
Both of the services are connected via RabbitMQ queues and channels. Now the problem is that when I send the first request after starting the server, it works as expected, the data processing service sends back the processed data and it is then sent to the client.
On the second request, the web server demands two responses from the data processing service to work. On third request, it demands three responses, each time it demands one extra response. I thought maybe the messages are not getting acknowledged that maybe causing this problem. I have tried out every single thing I could from the docs and StackoverFlow but I can’t seem to understand this.
I even tried to implement same solution in GoLang where I encountered the exact same issue. I am providing the Node JS code, please do let me know what’s wrong here and how can I fix it. Thanks you
Web Server:
const express = require("express")
const app = express();
const amqp = require("amqplib")
let channel;
const QUEUE_REQUEST = "data_request_queue"
const QUEUE_RESPONSE = "data_response_queue"
async function rabbitMQ() {
const connection = await amqp.connect("amqp://guest:guest@localhost:5672/")
const channel = await connection.createChannel();
await channel.assertQueue(QUEUE_REQUEST, {durable:false})
await channel.assertQueue(QUEUE_RESPONSE, {durable:false})
return channel;
}
app.get("/", async (req, res) => {
await channel.sendToQueue(QUEUE_REQUEST, Buffer.from("requesting data..."))
const msg = await new Promise((resolve, reject) => {
channel.consume(QUEUE_RESPONSE, msg => {
if (msg !== null) {
channel.ack(msg)
resolve(msg.content.toString())
} else {
channel.ack(msg)
reject("could not receive msg")
}
})
})
return res.json({message: msg})
})
app.get("/clean", async (req, res) => {
await channel.purgeQueue(QUEUE_REQUEST)
await channel.purgeQueue(QUEUE_RESPONSE)
return res.json({ message: "cleaned queues"})
})
app.listen(3000, async () => {
console.log("Listening on port: 3000")
channel = await rabbitMQ();
})
Data Processing Service
const amqp = require("amqplib/callback_api")
const QUEUE_REQUEST = "data_request_queue"
const QUEUE_RESPONSE = "data_response_queue"
amqp.connect("amqp://guest:guest@localhost:5672/", function (error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function (error1, channel) {
if (error1) {
throw error1
}
channel.assertQueue(QUEUE_REQUEST, {durable: false});
channel.assertQueue(QUEUE_RESPONSE, {durable: false});
channel.consume(QUEUE_REQUEST, async (msg) => {
if (msg !== null) {
channel.sendToQueue(QUEUE_RESPONSE, Buffer.from("data..."))
channel.ack(msg)
} else {
console.log("consumer cancelled")
}
})
});
})
Side Note: I am first time trying to implement this multi-service architecture just to learn how it works.