I am trying to parse an enormous JSON stream in Javascript using Node.js, which is then stored in mariadb. As part of that, I have a table that will have a small number of rows (~50), but each row has a unique value. That way if I run across the value while parsing, I don’t need to add it to the DB if it’s already there.
The problem is, those 50 values will be used literally millions of times, so I don’t really want to check the DB before I do an insert — if I can do this in one query I’d be happier.
In order to do this, and since we’re talking 50 values here, I use a set. When I come across a value, I first check to see if it’s in the set and if not, I add it to the DB, then the set.
The problem is that occasionally the function executes a second time with the same value before the set got its item added, so I end up with an attempt to add a duplicate row. I’ve tried async/await, but I suspect that the whole thing is wrapped up in an async method that’s being called from the top level of the file, so at some point the async/await chain breaks and what should run synchronously no longer does.
Here’s the code I use to insert (DB stuff done via the mariadb connector):
DB file helpers
export const pool = mariadb.createPool({
host: 'localhost',
port: '3306',
user: '*********',
password: '**********'
});
export async function getBodyType(body, conn, map) {
let subType = body.subType || "Other";
if (subType && !map.has(subType)) {
// Insert into the DB as we go, adding it to a set to ensure we don't duplicate
db.insertBodyType(subType, conn)
.then(result => {
map.add(subType);
});
}
return subType;
}
export async function insertBodyType(bodyType, conn) {
try {
await conn.query(queries.bodyTypesInsert, bodyType);
} catch (err) {
console.log(err);
}
}
Before continuing, I’ve also tried the insert/add block as follows, which didn’t work either:
await db.insertBodyType(subType, conn);
map.add(subType);
Now here’s the way I call the above function, which is not itself in a function (it’s just the main body of the script):
let types = new Set();
const stream = fs.createReadStream(inputFile, 'utf8');
const parser = JSONStream.parse('*');
stream.pipe(parser)
// I'm wondering if this is the culprit -- I don't know how to make the call to
// stream.pipe async or even if I can, so I don't know if making its body async even
// matters.
.on('data', async (system) => {
// Array of values we need for the system insert
let systemData = [
system.name,
system.coords.x,
system.coords.y,
system.coords.z,
helpers.isColonizable(system),
helpers.isPopulated(system),
helpers.canColonizeFrom(system)
];
let bodyMap = new Map();
// Somehow this sometimes calls twice before the first one finishes, despite the
// fact that the whole chain from getBodyType() on up should be async
for (let body of system.bodies) {
let bodyType = await helpers.getBodyType(body, conn, types);
if (!bodyMap.has(bodyType)) {
bodyMap.set(bodyType, 0);
}
bodyMap.set(bodyType, bodyMap.get(bodyType) + 1);
}
}
})
.on('end', () => {
db.pool.end();
})
.on('error', (err) => {
db.pool.end();
});
This whole thing is run in node:
$ node do_this.js
Honestly, from what I’m reading on async/await here, I’m wondering if I’m even using the right tool for the job. I might be better off using something like C# that can handle true synchronicity with async methods, assuming I read things right.