NodeJS Fork Balancer

I have read one of the article long back for solving high memory usage task in a separate child process. Based on the article I have implemented the same and latency has been reduced. But currently facing an wired issue.

Problem Statement: Fastify server – Browser will make a request to fastify server and in fastify I need to make 100+ parallel axios api calls to the downstream(Dont ask why Im dng- currently we have some challenges) and Ill return the response to the browser once all the api call are success. The latency issue arises due to these parallel API calls, particularly when there’s a higher TPS (transactions per second) to the Fastify server.

Solution: In an attempt to mitigate the latency issue, I endeavored to offload the logic for parallel calls to a child process using NodeJS’s Fork functionality. As expected, this reduced latency. However, the current issue I’m facing is that when there are numerous parallel requests to my server, approximately 60% of these requests receive incorrect responses.

enter image description here

Implementation:
ForkBalancer.js

import { ChildProcess, fork } from 'child_process';

const requestLimit = 0;

interface forkResponse {
  kill: boolean;
  string?: string;
}

class ForkBalancer {
path: string;
forks: number;
maxRAM?: number;
args?: Array<string>;

private activeFork: number;
private resolvers = new Map();
private renderers: Array<ChildProcess>;

constructor({ path = '', forks = 5, maxRAM = 250, args = [] }) {
    this.activeFork = 0;
    this.forks = forks;
    this.maxRAM = maxRAM;
    this.path = path;
    this.args = args;
    this.renderers = Array.from({ length: forks }, () => this.createFork());
}

public getFromRenderer(params: any): Promise<forkResponse> {
    const { resolvers, maxRAM, activeFork, restartFork, renderers } = this;
    const renderer = renderers[activeFork];

    return new Promise(function(resolve, reject) {
        try {
            renderer.once('message', (res: any) => {
                resolvers.delete(params.request.url);
                resolve(res);

                if (res.kill) restartFork();
            });

            if (!resolvers.has(params.request.url)) {
                renderer.setMaxListeners(requestLimit);
                resolvers.set(params.request.url, resolve);
                renderer.send({ ...params, maxRAM });
            }
        } catch (error) {
            resolvers.delete(params.request.url);
            reject(error);
        }
    });
}

private createFork = () => {
    const { path, args } = this;
    return fork(path, args);
};

private restartFork = () => {
    const { activeFork, renderers, next, createFork } = this;
    const renderer = renderers[activeFork];
    next();
    renderer.kill();
    this.renderers[activeFork] = createFork();
};

private next = () => {
    const { activeFork, forks } = this;
    if (activeFork === forks - 1) {
        this.activeFork = 0;
    } else {
        this.activeFork++;
    }
};
}
export default ForkBalancer; 

ParalllelAPI.js

import axiosInstance, { AxiosRequestConfig } from 'axios';

const maxRAM = 128
process.on('message', async (params: any) => {
  const { totalPages, offset: offsetProps = 0, PAGE_SIZE_LIMIT, request, body, testId, url } = params;
  const requests = [];
 for (let offset = offsetProps; offset <= totalPages; offset++) {
   requests.push(
      axiosInstance.post(
    `API_URL/search/v2?page=${offset}&limit=${PAGE_SIZE_LIMIT}`,
    body,
    {
      headers: {
        accept: 'application/json',
        authorization: `${request.token?.token_type} ${request.token?.access_token}`,
      }
    },
  )
);
}
const results = await Promise.allSettled(requests);

const list: any = [];
let isPartialFailed = false;
results.forEach((result) => {
if (result.status === 'fulfilled') {
  const quotesListData = result.value?.data?.quotes;
  if (Array.isArray(quotesListData)) {
    list.push(...quotesListData);
  }
} else {
  isPartialFailed = true;
}
});
const { heapUsed } = process.memoryUsage();

if (process.send) {
  process.send({
  key: request.url,
  list,
  url: request.url,
  testId,
  isPartialFailed,
  kill: heapUsed > maxRAM * 1024 * 1024,
 });
}
});

Implementation:

import path from 'path';
import ForkBalancer from './forkBalancer';

const forkBalancer = new ForkBalancer({
 path: path.resolve(__dirname, './ParallelAPI'),
});

const handler = async (req, res) => {
  const { body } = req.body;
  const { testId } = req.query;
  const response = await forkBalancer.getFromRenderer({
    request: { 
      token: request.token,
      url: request.url 
    },
    testId,
    PAGE_SIZE_LIMIT: 50,
    body,
    totalPages: 100
  });
  return res.send(response);
 }

 fastify.post('/getAllItems', handler);