In a node app, I’m trying to recursively query a database and then process the rows until the returned COUNT query would have 0 results. I’ve set up a recursive function that calls the query, then identifies the count from the results. The condition is met only twice, no matter the result of COUNT the third time. I am guessing this has to do with scope of the variables included but I can’t figure it out.
Using sf/sforce and db/dbase as connected classes that wrap jsforce and mysql2.
Recursive function
let queryMore = (dbase, sforce, callback) => {
let db = dbase;
let sf = sforce;
let cb = callback;
db.connection.promise().query('SELECT COUNT(*) FROM STATUS WHERE STATUS = '' + constants.statusOption.NOT_SENT + '' AND Type = ''+constants.typeOption.EMPLOYER+'';')
.then((results) => {
let count = results[0][0]['COUNT(*)'];
console.log(count);
if(count !== 0){
sendToSalesforceAndUpdateRows(db, sf).then(() => {
queryMore(db,sf,cb);
})
.catch(error => {
throw error;
})
}else{
cb();
}
})
.catch(error => {
throw error;
});
}
Query more calls sendToSalesforceAndUpdateRows and associated functions
let sendToSalesforceAndUpdateRows = (db, sf) => {
return new Promise((resolve, reject) => {
sendToSalesforce(db,sf)
.then(({successfulStream,failedStream}) => {
try{
console.log('sent. Updating rows');
let promises = [updateRecordsInDbase(successfulStream, db), updateRecordsInDbase(failedStream, db)];
Promise.all(promises)
.then(() => {
console.log('Updated Records');
resolve();
})
.catch(err => {
reject(err);
})
}catch(updateErr) {
reject(updateErr);
}
})
.catch(error => {
reject(error);
})
})
}
let sendToSalesforce = (db, sf) => {
return new Promise(async (resolve, reject) => {
let job = sf.createBulk2JobForStatus();
try{
job.on('inProgress', (jobinfo) => {
console.log('Job Info: ' + JSON.stringify(jobinfo));
})
job.on('error', (error) => {
console.log('Job Error');
throw error;
})
console.log('opening job');
await job.open();
console.log('job opened');
job.uploadData(db.queryRowsToPushAsStream(constants.typeOption.EMPLOYER)
.stream()
.pipe(prepRows)
.pipe(csv.stringify({
header : true,
columns : ['Claim_Number__c', 'File_Date__c','JSON__c','MySQL_Id__c','Type__c']
})))
.then(async () => {
await job.close();
await job.poll(3000, 600000);
let successfulStream = await sf.getSuccessfulResultsAsStream(job);
let failedStream = await sf.getFailedResultsAsStream(job);
resolve({successfulStream, failedStream});
});
}catch(e){
await job.close();
reject(e);
}
})
}
let updateRecordsInDbase = (recordStream, db) => {
return new Promise((resolve, reject) => {
try{
db.updateStatusRowsFromTStream(recordStream
.pipe(chunkResults), () => {
resolve();
});
}catch (error){
reject(error);
}
})
}
The query is limited to returning 10,000 records at a time due to heap size limitations when asking for results from the salesforce bulk api, but I will routinely have >30k rows to send, so I want to string these together and maintain the ability to write back to the database along the way.
What is supposed to happen for 30k records:
queryMore =>
count = 30000 =>
sendResultsToSalesforceAndUpdateRows =>
queryMore =>
count = 20000 =>
sendResultsToSalesforceAndUpdateRows =>
queryMore =>
count = 10000 =>
sendResultsToSalesforceAndUpdateRows =>
queryMore =>
count = 0 =>
callback
What I am seeing instead:
queryMore =>
count = 30000 =>
sendResultsToSalesforceAndUpdateRows =>
queryMore =>
count = 20000 =>
sendResultsToSalesforceAndUpdateRows =>
callback