i have attached the following function with my cron which hits every 10 mins , the problem is that
the following function is not running on my stage environment (was running perfectly fine on my local). I believe that the problem resides within function _fetchIdToProcess , since the whole code is dependent on the length of the array . Also using bluebird for “promosifying” redis calls
const CacheUtils = require('../../utils/cache-utils');
const ESUtils = require('../../utils/es-utils');
const esClient = ESUtils.initializeESConnection();
/**
* @description the following cron fetches all the docs from events needed to be merged on the basis of
* fileGroupId , after merging all the "dangling docs" will be deleted
*/
module.exports = class MergeFileEvents {
static async mergeFileEvent() {
const client = await CacheUtils.getClient();
const fileGroupIds = await this._fetchIdToProcess('file_upload*');
if (fileGroupIds.length > 0) {
// locking the fileGroupIds , for preventing dirty reads
await client.saddAsync('processing_file_ids',fileGroupIds);
// main implementation for merging docs
await this._mergeDocs(fileGroupIds);
// releasing the locks (deleting the ids)
await this._releaseLockFileGroupIds(fileGroupIds)
}
}
static async _fetchIdToProcess(pattern) {
const client = await CacheUtils.getClient();
let keys = [];
let cursor = '0';
do {
const reply = await client.scanAsync(cursor, 'MATCH', pattern, 'COUNT', 1000);
cursor = reply[0];
keys = keys.concat(reply[1]);
} while (cursor !== '0');
const processingIds = await client.smembersAsync('processing_file_ids') || [];
return keys.filter(key => !processingIds.includes(key));
}
static async _mergeDocs(fileGroupIds) {
const date = new Date();
const client =await CacheUtils.getClient();
for (let id of fileGroupIds) {
const eventIds = await client.smembersAsync(id);
if (eventIds.length <= 1) continue; // no need to merge in this case
const body = await esClient.search({
index: `events_${date.toJSON().split('T')[0]}`,
body: {
query: { ids: { values: eventIds } },
_source: ['file.id'],
size: eventIds.length
}
});
const fileIdsPayload = await Promise.all(eventIds.map(async (id) => {
const createdAt = await client.getAsync(id);
return{
id: body.hits.hits.find(hit => hit._id == id)?._source.file.id,
createdAt
}
}));
const lastUpdate = fileIdsPayload.reduce((max, curr) => {
return new Date(curr.createdAt) > new Date(max.createdAt) ? curr : max;
}).createdAt;
await esClient.update({
index: `events_${date.toJSON().split('T')[0]}`,
id: eventIds[0],
body: {
doc: {
files: fileIdsPayload,
updatedAt: lastUpdate
}
}
});
const deleteEvents = eventIds.slice(1).map(id => ({
delete: {
_index: `events_${date.toJSON().split('T')[0]}`,
_id: id
}
}));
await esClient.bulk({
body: deleteEvents
});
}
}
static async _releaseLockFileGroupIds(fileGroupIds) {
const client =await CacheUtils.getClient();
// deleting fileGroupIds from processing set
await client.sremAsync('processing_file_ids',fileGroupIds);
// deleting timestamps of eventId
for (let id of fileGroupIds) {
await client.delAsync(id);
}
}
}