An app architecture decision was made to fork processes to achieve high-load, non-blocking transaction updates. Now, we’re contending with ipc serialization ‘challenges’ that stem from the serialization of Uint8Array(3) {fixed-length} primary keys into node.js { type: ‘Buffer’, data: [255, 255, 255] } as an example.
The data model uses pk keys in uin8array form primarily for access to the database. The question: is there a way to prevent deserialization or a fast, efficient, or better way to preserve or return structures received over ipc to their native form?
The following samples show this condition:
Data Sent
sent: {
instrument: <Buffer cb 42 a5>,
symbol: 'XRP-USDT',
base_currency: <Buffer b5 cd b2>,
base_symbol: 'XRP',
quote_currency: <Buffer 35 0f e2>,
quote_symbol: 'USDT',
period: <Buffer ca 34 49>,
timeframe: '15m',
timeframe_units: 15,
bulk_collection_rate: 1440,
interval_collection_rate: 4,
sma_factor: 60,
digits: 5,
trade_state: <Buffer 7c 7a b8>,
state: 'Enabled',
active_collection: 1,
suspense: 0
}
Data received:
Child process PID: 1339310
In Child: Recieved: {
instrument: { type: 'Buffer', data: [ 203, 66, 165 ] },
symbol: 'XRP-USDT',
base_currency: { type: 'Buffer', data: [ 181, 205, 178 ] },
base_symbol: 'XRP',
quote_currency: { type: 'Buffer', data: [ 53, 15, 226 ] },
quote_symbol: 'USDT',
period: { type: 'Buffer', data: [ 202, 52, 73 ] },
timeframe: '15m',
timeframe_units: 15,
bulk_collection_rate: 1440,
interval_collection_rate: 4,
sma_factor: 60,
digits: 5,
trade_state: { type: 'Buffer', data: [ 124, 122, 184 ] },
state: 'Enabled',
active_collection: 1,
suspense: 0
}
The parent process is straight forward enough, collect and open a child for each enabled instrument; then, each process will attach to a websocket for updates.
The parent:
"use strict";
import { fork } from "child_process";
import { State } from "@db/interfaces/trade_state";
import * as InstrumentPeriod from "@db/interfaces/instrument_period";
import type { IInstrumentPeriod } from "@/db/interfaces/instrument_period";
//+--------------------------------------------------------------------------------------+
//| CProcess - Order Processing Class/Container |
//+--------------------------------------------------------------------------------------+
export class CProcess {
//+------------------------------------------------------------------------------------+
//| Start - Loads order class array, syncs bar history, processes orders |
//+------------------------------------------------------------------------------------+
async Start() {
const start:Array<Partial<IInstrumentPeriod>> = await InstrumentPeriod.Fetch({ symbol: "XRP", state: State.Enabled });
start.forEach((instrument) => {
console.log('sent:', instrument)
const child = fork("./class/child.ts");
child.send({ type: "init", data: instrument });
child.send({type: 'update'});
console.log(`Child process PID: ${child.pid}`);
child.on("message", (message) => {
console.log(`Report from child process ${child.pid}:`, message);
});
child.on("exit", (code) => {
console.log(`Child process ${child.pid} exited with code ${code}`);
});
})
}
}
The child
"use strict";
import { COrder } from "@class/order";
import { CFractal } from "@class/fractal";
import { State } from "@db/interfaces/trade_state";
import type { IInstrumentPeriod } from "@db/interfaces/instrument_period";
import type { IInstrument } from "@db/interfaces/instrument";
import type { ICandle } from "@db/interfaces/candle";
import * as InstrumentPeriod from "@db/interfaces/instrument_period";
import * as Instrument from "@db/interfaces/instrument";
import * as Candle from "@db/interfaces/candle";
//+--------------------------------------------------------------------------------------+
//| CChild - Order Processing Class/Container |
//+--------------------------------------------------------------------------------------+
export class CChild {
private Order: Array<COrder> = [];
private Fractal: Array<CFractal> = [];
}
interface IRequest {
type: string;
data: IInstrumentPeriod;
}
interface IResult {
symbol: string | string[]
status: string;
data: number;
error: string;
}
const app = [];
//+------------------------------------------------------------------------------------+
//| Handle instrument start |
//+------------------------------------------------------------------------------------+
process.on("message", async (message:IRequest) => {
const result:IResult = { symbol: "", status: "", data: 0, error: "" };
if (message.type === "init") {
const instrument:IInstrumentPeriod = message.data;
console.log("In Child: Recieved: ",instrument);
const [fractal] = await Instrument.Fetch({symbol: instrument.symbol});
const props: Candle.IKeyProps = {
instrument: fractal.instrument!,
period: fractal.trade_period!,
symbol: fractal.symbol!,
timeframe: instrument.timeframe!,
};
console.log("fractal;",fractal, "props:",props)
const candles: Array<Partial<ICandle>> = await Candle.Fetch(props, 10);
const appclass = new CFractal(fractal,candles);
app.push(appclass);
result.symbol = instrument.symbol!;
result.status = "ok";
}
if (message.type === 'update') {
console.log("classes loaded: ", app.length)
}
if (typeof process.send !== "undefined") {
process.send(result);
}
});