custom created cron function is not working

i have attached the following function with my cron which hits every 10 mins , the problem is that

the following function is not running on my stage environment (was running perfectly fine on my local). I believe that the problem resides within function _fetchIdToProcess , since the whole code is dependent on the length of the array . Also using bluebird for “promosifying” redis calls

const CacheUtils = require('../../utils/cache-utils');
const ESUtils = require('../../utils/es-utils');
const esClient = ESUtils.initializeESConnection();


/**
 * @description the following cron fetches all the docs from events needed to be merged on the basis of 
 * fileGroupId , after merging all the "dangling docs" will be deleted 
 */


module.exports = class MergeFileEvents {
  static async mergeFileEvent() {
    const client = await CacheUtils.getClient();
    const fileGroupIds = await this._fetchIdToProcess('file_upload*');

    if (fileGroupIds.length > 0) {

      // locking the fileGroupIds , for preventing dirty reads

      await client.saddAsync('processing_file_ids',fileGroupIds);

      // main implementation for merging docs
      await this._mergeDocs(fileGroupIds);

      // releasing the locks (deleting the ids) 
      await this._releaseLockFileGroupIds(fileGroupIds)
    }
  }


  static async _fetchIdToProcess(pattern) {
    const client = await CacheUtils.getClient();
    let keys = [];
    let cursor = '0';

    do {
      const reply = await client.scanAsync(cursor, 'MATCH', pattern, 'COUNT', 1000);
      cursor = reply[0];
      keys = keys.concat(reply[1]);
    } while (cursor !== '0');

    const processingIds = await client.smembersAsync('processing_file_ids') || [];

    return keys.filter(key => !processingIds.includes(key));
  }


  static async _mergeDocs(fileGroupIds) {
    const date = new Date();
    const client =await CacheUtils.getClient();
    for (let id of fileGroupIds) {
      const eventIds = await client.smembersAsync(id);

      if (eventIds.length <= 1) continue; // no need to merge in this case 
    
      const  body  = await esClient.search({
        index: `events_${date.toJSON().split('T')[0]}`,
        body: {
          query: { ids: { values: eventIds } },
          _source: ['file.id'],
          size: eventIds.length
        }
      });
      
      const fileIdsPayload = await Promise.all(eventIds.map(async (id) => {
        const createdAt = await client.getAsync(id); 
          return{
             id: body.hits.hits.find(hit => hit._id == id)?._source.file.id,
             createdAt
          }
      }));

      const lastUpdate = fileIdsPayload.reduce((max, curr) => {
        return new Date(curr.createdAt) > new Date(max.createdAt) ? curr : max;
      }).createdAt;


      await esClient.update({
        index: `events_${date.toJSON().split('T')[0]}`,
        id: eventIds[0],
        body: {
          doc: {
            files: fileIdsPayload,
            updatedAt: lastUpdate
          }
        }
      });

      const deleteEvents = eventIds.slice(1).map(id => ({
        delete: {
          _index: `events_${date.toJSON().split('T')[0]}`,
          _id: id
        }
      }));

      await esClient.bulk({
        body: deleteEvents
      });

    }
  }

  static async _releaseLockFileGroupIds(fileGroupIds) {

    const client =await CacheUtils.getClient();

    // deleting fileGroupIds from processing set
    await client.sremAsync('processing_file_ids',fileGroupIds);

    // deleting timestamps of eventId
    for (let id of fileGroupIds) {
      await client.delAsync(id);
    }
  }
}