I am working on chat app. where I use different socket events to send message from client to server through sockets. Here is how it works client send request with jwt on socket I check it is connected before or not if it is just delete that session and create new session and save it’s information in redies. Here is my server file code like this
require("dotenv").config();
const port = 8181;
const cluster = require("cluster");
const io_redis = require("socket.io-redis");
const num_processes = require("os").cpus().length;
import * as net from "net";
import cors from "cors";
import * as http from "http";
import router from "./routes";
import { PeerServer } from "peer";
import express from "express";
import * as socket from "socket.io";
import * as farmhash from "farmhash";
import cookieParser from "cookie-parser";
import { socketMain } from "./socket.io/socketMain";
import { inititalizeMongoDb } from "./database/mongoInstance";
import { isAuthSocket } from "./middlewares/isAuthSocket.middleware";
import { deleteOldMessageCron } from './services/deletemessagecron';
import { createClient, RedisClientType } from "redis";
// Create the Redis client using a URL
const redisClient: RedisClientType = createClient({ url: '' });
redisClient.connect();
(async () => {
if (cluster.isMaster) {
const workers: any = [];
const spawn = (i: number) => {
workers[i] = cluster.fork();
workers[i].on("exit", () => {
console.log("respawning worker", i);
spawn(i);
});
};
for (var i = 0; i < num_processes; i++) {
spawn(i);
}
const worker_index = (ip: string, len: number) => {
return farmhash.fingerprint32(ip) % len;
};
const server: net.Server = net.createServer(
{ pauseOnConnect: true },
(connection: net.Socket) => {
const worker =
workers[worker_index(connection.remoteAddress, num_processes)];
worker.send("sticky-session:connection", connection);
}
);
server.listen(port);
console.log(`Master listening on port ${port}`);
} else {
let app = express();
app.use(express.json({ limit: '50mb' }));
app.use(express.urlencoded({ limit: '50mb', extended: true }));
app.use(cookieParser());
app.use(
cors({
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
})
);
app.use("/", router);
const server: http.Server = app.listen(0, "localhost");
console.log("Worker listening...");
const io = new socket.Server(server, {
cors: {
origin: [ "file://", "http://localhost", "http://localhost:3000"],
credentials: true,
},
pingTimeout: 120000,
pingInterval: 25000,
});
io.adapter(io_redis({
url: process.env.REDIS_URL,
retryStrategy: (times: any) => {
const delay = Math.min(times * 50, 2000);
return delay;
}
}));
await inititalizeMongoDb();
deleteOldMessageCron();
io.use(isAuthSocket);
io.on("error", (err: any) => {
console.log("Socket.io Error:", err);
});
io.on("connection", (socket: socket.Socket) => {
console.log("connected to socket server", socket.id);
socketMain(io, socket, redisClient);
console.log(`connected to worker: ${cluster.worker.id}`);
});
process.on("message", (message, connection) => {
if (message !== "sticky-session:connection") {
return;
}
server.emit("connection", connection);
//@ts-ignore
connection.resume();
});
}
})();
my socketMain file where all events get handled
import { callOtherUser } from "./handlers/callOtherUser.handler";
import { disconnectVideoCall } from "./handlers/disconnectCall.handler";
import { getTotalUsers } from "./handlers/getTotalUsers.handler";
import { handleActiveSession } from "./handlers/handleActiveSession";
import { initialSocketConfig } from "./handlers/initialVerification";
import { iTextMessage } from "./handlers/iTextMessage.handler";
import { joinVideoRoom } from "./handlers/joinVideoRoom.handler";
import { rejectVideoCall } from "./handlers/rejectCall.handler";
import { socketDisconnect } from "./handlers/socketDisconnect.handler";
import { updateGroupInfo } from "./handlers/updateGroupInfo.handler";
import { updateOthersChats } from "./handlers/updateOthersChats.handler";
import { updateUserProfile } from "./handlers/updateUserProfile.handler";
import { userOnCall } from "./handlers/userOnCall.handler";
import { MarkAsReadMessage } from './handlers/MarkAsReadMessage.handler';
import { NickName } from './handlers/NickName.handler';
import { switchActiveChat } from "./handlers/switchActiveChat.handler";
import { RedisClientType } from "redis";
import { blockUser } from './blockUser.handler';
export const socketMain = async (io: any, socket: any, redisClient: RedisClientType) => {
try {
let heartbeatTimeout: NodeJS.Timeout;
const { _id, db, userPayload } = await initialSocketConfig(
io,
socket
);
handleActiveSession(io, socket, _id, userPayload?.displayName, redisClient);
socket.emit("signInSuccess", {
objectId: _id,
displayName: userPayload?.displayName,
email: userPayload?.email,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on("callOtherUser", (payload: any) =>
callOtherUser(io, _id, db, payload)
);
socket.on("join-vc-room", (roomId: string, peerUserId: string) =>
joinVideoRoom(socket, roomId, peerUserId)
);
socket.on("diconnect-from-call", (roomId: string, peerUserId: string) =>
disconnectVideoCall(socket, roomId, peerUserId)
);
socket.on("reject-call", (roomId: string) =>
rejectVideoCall(socket, roomId)
);
socket.on("user-on-call", (roomId: string) => userOnCall(socket, roomId));
socket.on("getTotalUsers", () => getTotalUsers(db, socket, _id));
socket.broadcast.emit("updateTotalUsers", {
objectId: userPayload?._id,
displayName: userPayload?.displayName,
avatar: userPayload?.avatar,
createdOn: userPayload?.createdOn,
about: userPayload?.about,
lastSeen: userPayload?.lastSeen,
});
socket.on(
"updateUserProfile",
async (payload: any) => await updateUserProfile(socket, _id, payload, db)
);
socket.on("updateGroupInfo", async (payload: any) =>
updateGroupInfo(io, _id, payload, db)
);
socket.on("updateOthersChats", (payload: any) => {
console.log("updateOthersChats", payload)
updateOthersChats(db, io, _id, payload)
}
);
socket.on("iTextMessage", async (payload: any) =>
iTextMessage(io, socket, payload, db, _id)
);
socket.on("markAsReadMessage", async (payload: any) =>
MarkAsReadMessage(io, socket, payload, db, _id)
);
socket.on("setNickname", async (payload: any) =>
NickName(io, payload, db, _id)
);
socket.broadcast.emit("online", _id);
socket.on("switchActiveChat", async (payload: any) =>
switchActiveChat(io, socket, _id, db, payload)
);
socket.on("user-blocked", async (payload: any) => {
blockUser(io, socket, _id, db, payload)
})
socket.on('heartbeat', (data: any) => {
console.log('Client heartbeat:', socket.id);
// Respond immediately to client
socket.emit('heartbeat_received');
// Clear existing timeout
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
// Set new timeout for missed heartbeat
heartbeatTimeout = setTimeout(() => {
console.log('Client heartbeat timeout:', socket.id);
socket.disconnect(true);
}, 90000); // 90 seconds (allowing for network delays)
});
socket.on("disconnect", async (reason: any) => {
if (heartbeatTimeout) {
clearTimeout(heartbeatTimeout);
}
console.log("DISCONNECT REASON:", reason);
console.log("Socket Namespace:", socket.nsp.name);
console.log("Socket ID:", socket.id);
try {
await socketDisconnect(socket, _id, db, redisClient);
} catch (err) {
console.error("Error during socket disconnect:", err);
}
});
} catch (err) {
console.log("MAIN SOCKET ERR", err);
}
};
handleActiveSession file where user connections save or removed from redies
import * as socket from "socket.io";
import {
getActiveUserByObjectId,
removeActiveUserByObjectId,
addToActiveUsers,
getActiveUsers
} from "../../utils/activeUsers";
import { RedisClientType } from "redis";
export const handleActiveSession = async (
io: socket.Server,
socket: socket.Socket,
_id: string,
Name: string,
redisClient: RedisClientType
) => {
const userKey = `user:${_id}`;
const userSession = {
socketId: socket.id,
objectId: _id,
Name: Name
};
await redisClient.set(userKey, JSON.stringify(userSession));
if (!getActiveUserByObjectId(_id)) {
console.log("New session!");
addToActiveUsers(userSession);
console.log("Active Users: ", getActiveUsers());
} else {
console.log("Prev Disconnected, New session!");
const prevSocketId = getActiveUserByObjectId(_id)?.socketId;
if (io.sockets.sockets.get(prevSocketId)) {
console.log(prevSocketId + "multipleSession disconnected");
io.sockets.sockets.get(prevSocketId).emit("multipleSession");
io.sockets.sockets.get(prevSocketId).disconnect(true);
}
removeActiveUserByObjectId(_id);
addToActiveUsers(userSession);
}
};


