I have around 200,000 products in my MongoDB database. I need to write a script to remove products that are no longer available on Envato and replace them with new ones. I’ve written the following script to accomplish this, but I’m looking for suggestions to improve its efficiency and robustness.
The main script iterates over the products, checks their availability on Envato using the envatoApi, and replaces unavailable products with similar or new items. The code uses batching, Redis for tracking progress, and handles API rate limiting and retries.
Here’s my main script:
import { envatoApi } from '#apis';
import { productService, redisService, websiteService } from '#services';
import AppError from '#shared/AppError';
import { convert } from 'html-to-text';
import slugify from 'slugify';
import Model from '#models/Product';
import { Mutex } from 'async-mutex';
const config = {
BATCH_SIZE: 500,
API_BATCH_SIZE: 5,
API_DELAY: 3000,
MAX_ATTEMPTS: 10,
SKIP_COUNT: 0,
REDIS_KEY: 'processed_count',
REDIS_PAGE_KEY: 'current_page',
};
const mutex = new Mutex();
const removeOldProductsAndAddNew = async category => {
let processedCount = await redisService.get(config.REDIS_KEY);
processedCount = processedCount ? parseInt(processedCount, 10) : config.SKIP_COUNT;
let noOfBatches = 1;
let batch = [];
let skippedCount = 0;
try {
console.log('skipped items', processedCount);
let productCursor = Model.find({ category: { $in: [category] } }).cursor();
let run = true;
while (run) {
let hasNext = true;
try {
for await (const doc of productCursor) {
if (skippedCount < processedCount) {
skippedCount++;
continue;
}
batch.push(doc);
if (batch.length >= config.BATCH_SIZE) {
await processBatch(batch);
noOfBatches++;
console.log(`Mongo batch ${noOfBatches} processed.`);
batch = [];
processedCount += config.BATCH_SIZE;
await redisService.set(config.REDIS_KEY, processedCount);
}
}
hasNext = false; // No more documents to process
} catch (error) {
if (error.code === 43) {
console.warn('Cursor not found, reopening cursor...');
productCursor = Model.find({ category: { $in: [category] } })
.skip(processedCount)
.cursor();
} else {
throw error;
}
} finally {
try {
await productCursor.close();
} catch (closeError) {
console.error('Error closing cursor:', closeError);
}
}
if (!hasNext) break;
}
if (batch.length > 0) {
await processBatch(batch);
processedCount += batch.length;
await redisService.set(config.REDIS_KEY, processedCount);
}
console.log(`Processing completed. Number of Mongo batches processed: ${noOfBatches}`);
} catch (error) {
console.error('Error processing batches:', error);
throw new AppError(error || 'An unexpected error occurred', error.statusCode || 500);
}
};
const processBatch = async batch => {
for (let i = 0; i < batch.length; i += config.API_BATCH_SIZE) {
const batchSlice = batch.slice(i, i + config.API_BATCH_SIZE);
console.log(`Processing API requests: ${i + config.API_BATCH_SIZE}`);
// await Promise.all(batchSlice.map(product => removeOldProducts(product)));
for (const product of batchSlice) {
await removeOldProducts(product);
}
await delay(config.API_DELAY);
}
};
const delay = ms => new Promise(resolve => setTimeout(resolve, ms));
const removeOldProducts = async product => {
const [site, externalId] = parseExternalId(product?.externalId);
try {
await envatoApi.getItemDetail(externalId);
} catch (error) {
if (error?.error === 404) {
console.log('Product not found, initiating replacement...');
await handleProductReplacement({ id: externalId, site }, product);
} else {
// console.error('Error fetching item detail:', error);
throw new AppError(error.description || 'An unexpected error occurred', error.error || 500);
}
}
};
const parseExternalId = externalId => {
if (!externalId) return [null, null];
const [sitePart, idPart] = externalId.split('_');
return [sitePart, idPart];
};
const handleProductReplacement = async ({ id, site }, product) => {
let attempts = 0;
while (attempts < config.MAX_ATTEMPTS) {
let newProduct = null;
await productService.remove(product._id);
try {
newProduct = await fetchSimilarOrNewItem(id, site, product);
if (newProduct) {
await updateProductReferences(product, newProduct);
return { message: 'Product updated successfully' };
}
} catch (error) {
const errorCode = error?.error || error?.code || error?.statusCode || 500;
const errorMessage = error?.message || 'An unexpected error occurred';
if (errorCode === 404) {
console.log(`404 Error, item not found, retrying... Attempts: ${attempts}`);
} else {
throw new AppError(errorMessage, errorCode);
}
}
attempts++;
}
console.log(`Maximum attempts reached for product ${product._id}, removing from all websites.`);
await websiteService.collection.updateMany({ products: { $in: [product._id] } }, { $pull: { products: product._id } });
return { message: 'Product removed after maximum attempts' };
};
const fetchSimilarOrNewItem = async (id, site, product) => {
let newProduct = null;
let run = true;
let page = parseInt(await redisService.get(config.REDIS_PAGE_KEY), 10) || 1;
while (run) {
console.log('Processing page', page);
const similarItems = await envatoApi.getSimilarItems(site, page);
if (!similarItems || !similarItems.matches || similarItems.matches.length === 0) {
console.log('No similar items found. Exiting loop.');
break;
}
for (const item of similarItems.matches) {
try {
newProduct = await createProductFromItem(item, product.category);
const release = await mutex.acquire();
try {
const existingProduct = await productService.collection.findOne({ slug: newProduct.slug });
if (!existingProduct) {
console.log(`New product found: ${newProduct.slug}`);
await redisService.set(config.REDIS_PAGE_KEY, page);
return newProduct;
} else {
console.log(`Product with slug ${newProduct.slug} already exists, trying another...`);
}
} finally {
release();
}
} catch (error) {
console.error('Error adding product from item:', error);
}
}
if (similarItems?.matches === 0) {
console.log('No more pages left. Exiting loop.');
break;
}
page++;
}
throw new AppError('No valid similar or new item found', 400);
};
const createProductFromItem = async (item, category) => {
if (!item || !item.id) {
console.error('Invalid item object:', item);
throw new AppError('Invalid item object', 400);
}
const { id: newProductId, name, url, site, attributes, description, price_cents, previews } = await envatoApi.getItemDetail(item.id);
if (!newProductId || !name || !url || !site) {
console.error('Missing necessary product details:', { newProductId, name, url, site });
throw new AppError('Missing necessary product details from item', 400);
}
const slug = slugify(name, { lower: true, strict: true });
return {
name,
source: site,
slug,
reference: url,
externalId: `${site}_${newProductId}`,
attributes,
price: price_cents / 100,
description: convert(description, { wordwrap: false }).replace(/n/g, ' ').replace(/s+/g, ' ').trim(),
previews,
images: [previews?.icon_preview?.icon_url],
category,
};
};
const updateProductReferences = async (oldProduct, newProduct) => {
const { _id } = await productService.create(newProduct);
await websiteService.collection.updateMany({ products: { $in: [oldProduct._id] } }, [
{
$set: {
products: {
$concatArrays: [
{
$filter: {
input: '$products',
as: 'product',
cond: { $ne: ['$$product', oldProduct._id] },
},
},
[_id],
],
},
},
},
]);
console.log('Product references updated successfully.');
};
export default {
removeOldProductsAndAddNew,
};
The envatoApi module used in the script:
import axios from 'axios';
import config from '#config';
const instance = axios.create({
baseURL: 'https://api.envato.com',
headers: {
Authorization: `Bearer ${config.apis.envato.apiKey}`,
},
});
const onFulfilled = response => {
const { data } = response;
return data;
};
const onRejected = err => {
const { error, description } = err.response.data;
return Promise.reject({ error, description });
};
instance.interceptors.response.use(onFulfilled, onRejected);
/**
*
* @param {string} purchaseCode
* @returns {Promise<string>}
*/
const getDownloadUrl = async purchaseCode => {
const { download_url: url } = await instance.get('/v3/market/buyer/download', { params: { purchase_code: purchaseCode, shorten_url: true } });
return url;
};
const getItemDetail = async id => {
const data = await instance.get(`/v3/market/catalog/item?id=${id}`);
return data;
};
const getSimilarItem = async id => {
const data = await instance.get(`/v1/discovery/search/search/more_like_this?item_id=${id}`);
return data;
};
const getSimilarItems = async (site, page) => {
const data = await instance.get(`/v1/discovery/search/search/item?site=${site}&date=this-year&page=${page}`);
return data;
};
export default {
getDownloadUrl,
getItemDetail,
getSimilarItem,
getSimilarItems,
};