-
Hello, I’m working in a node app that act like a proxy when uploading files to S3 Bucket one of the requirements is to reduce the latency between the user upload and the server upload + never buffer files into memory then upload it so I checked the multer code and created something similar to handle my case I tried to handle the most possible cases as possible and uploaded the chunks to bucket successfully but unfortunately after checkout the bucket it’s make 2 files one with 0B and second with the actual files size so when upload the first file and check versions I can see it as described above and when upload another file it’s overwrite the file uploaded before and move to versions and keep only the latest uploaded file although that the filename is always unique and the files never conflict with each other but I don’t know why that happens I checked the code for multiple times and I couldn’t find the mistake causing that issue here’s my code and screenshot of versions and the current available files they are should be 2 files while it keep only one and (deleted / moved) the other file to version
-
here’s the image for versions

- an image for exists files

const BusBoy = require('busboy');
const appendField = require('append-field');
const createHttpError = require('http-errors');
const { Upload } = require('@aws-sdk/lib-storage');
const { DeleteObjectCommand, PutObjectCommand, AbortMultipartUploadCommand } = require('@aws-sdk/client-s3');
const onFinished = require('on-finished');
const { v4: uuidv4 } = require('uuid');
const path = require('path');
const { EventEmitter } = require('events');
class FilesCounter extends EventEmitter {
constructor() {
super();
this.value = 0;
}
increment() {
this.value++;
}
decrement() {
if (--this.value === 0) {
this.emit('zero');
}
}
isZero() {
return this.value === 0;
}
/**
* @param {() => any} eventHandler
* @returns {void}
*/
onceZero(eventHandler) {
if (this.isZero()) {
return eventHandler();
}
this.once('zero', eventHandler);
}
};
const errorMessages = {
LIMIT_PART_COUNT: 'Too many parts',
LIMIT_FILE_SIZE: 'File too large',
LIMIT_FILE_COUNT: 'Too many files',
LIMIT_FIELD_KEY: 'Field name too long',
LIMIT_FIELD_VALUE: 'Field value too long',
LIMIT_FIELD_COUNT: 'Too many fields',
LIMIT_UNEXPECTED_FILE: 'Unexpected field',
MISSING_FIELD_NAME: 'Field name missing',
CLIENT_ABORTED: 'Client aborted',
};
class ErrorHandler extends Error {
/**
* @param {keyof errorMessages | { code: keyof errorMessages; seperate: string }} code
* @param {string} field
*/
constructor(code, field) {
super();
const messageCode = typeof code === 'string' ? code : code.code;
this.name = this.constructor.name;
this.message = errorMessages[messageCode].concat(typeof code === 'string' ? '' : ` ${code.seperate}`);
this.code = messageCode;
if (typeof field === 'string') {
this.field = field;
}
Error.captureStackTrace(this, this.constructor);
}
};
/**
* @typedef {{
* fieldname: string;
* originalname: string;
* encoding: string;
* mimetype: string;
* filename: string;
* }} file
* @typedef {(filterErr: ErrorHandler | Error, allow: boolean) => any} callback
*/
module.exports = class S3UploadMiddleware {
/**
* @param {{
* S3Client: import('@aws-sdk/client-s3').S3Client;
* BUCKET_NAME: string;
* logger?: import('winston').Logger | Console;
* filenameGenerator?: (fileName: string) => string;
* }} param0
*/
constructor({ S3Client, logger, filenameGenerator, BUCKET_NAME }) {
this.s3 = S3Client;
this.logger = logger || console;
this.filenameGenerator = filenameGenerator || this.generateUniqueFileName;
this.BUCKET_NAME = BUCKET_NAME;
}
/**
* @param {{
* fields?: {name: string; maxCount: number}[],
* limits?: import('busboy').Limits,
* fileFilter?: (req: import('express').Request, file: Omit<import('busboy').FieldInfo, 'mimeType'> & { filename: string; mimetype: string }, cb: (err?: Error, reject?: boolean) => void) => any
* }} options
*/
createMiddleware(options) {
// validate options
if (!options.fields?.[0]?.name) {
throw new Error('Invalid fields please specify the name of the fields you want to upload');
}
/**
* @param {import('express').Request} req
* @param {import('express').Response} res
* @param {import('express').NextFunction} next
*/
return async (req, res, next) => {
// skip none form data content
if (!this.isFormData(req.headers['content-type'])) return next();
const ContentLength = Number(req.headers['content-length'] || 0);
// >>>>>>>>>>>>>>>>>>>> init before create middleware <<<<<<<<<<<<<<<<<<
// create an Map of fields names and max files limit to upload to use it in allow uploading files or skip it or aborting process
const requiredFields = new Map((options.fields || []).map(e => [e.name, typeof e.maxCount === 'number' ? e.maxCount : Infinity]));
const isSingleFile = (options.fields || []).length === 1 && (options.fields || [])[0]?.maxCount === 1;
// handling filter files
const fileFilter = options.fileFilter || ((req, file, cb) => cb(null, true));
/**
* wrapper to handle when field isn't exists in fields array of objects to indicate whether to skip it or continue or abort process when field max count is reached
* @param {import('express').Request} req
* @param {file} file
* @param {callback} cb
*/
const filesFilterWrapper = (req, file, cb) => {
const filesMaxCount = requiredFields.get(file.fieldname);
if (typeof filesMaxCount === 'undefined') {
this.logger.warn(`Unexpected file field [${file.fieldname}] - skipping upload`);
return cb(new ErrorHandler('LIMIT_UNEXPECTED_FILE', file.fieldname), false);
}
if (filesMaxCount <= 0) {
this.logger.warn(`File limit exceeded for field [${file.fieldname}] - skipping`);
return cb(new ErrorHandler('LIMIT_FILE_COUNT', file.fieldname), false);
}
// update the file limit counter
requiredFields.set(file.fieldname, filesMaxCount - 1);
fileFilter(req, file, cb);
};
// const ContentLength = req.headers['content-length'];
// prepare request body parse form fields inside busboy 'field' event
req.body = {};
/**
* @type {import('busboy').Busboy | undefined}
*/
let busboy;
try {
busboy = BusBoy({ headers: req.headers, limits: options.limits });
} catch (err) {
this.logger.error('BusBoy init error', err);
return next(err);
}
// >>>>>>>>>>>>> start handling file upload to bucket <<<<<<<<<<<<<<<<<<<
// indicators
let isDone = false;
let readFinished = false;
let errorOccured = false;
// handlers to handle abort uploades / remove uploaded files and count the number of pending files to call next when receive zero files signal and call next
const pendingWrites = new FilesCounter();
/**
* @type {string[]}
*/
const uploadedFiles = [];
/**
* @type {AbortController[]}
*/
const abortControllers = [];
// >>>>>>>>>>>>>>>>>> initialize helpers <<<<<<<<<<<<<<<<<<<
// cleanup / delete uploaded files
const cleanupUploadedFiles = async slient => {
if (!uploadedFiles.length) return;
// Use Promise.allSettled to handle cleanup gracefully, even if some deletions fail
const results = await Promise.allSettled(uploadedFiles.map(this.deleteObject));
const rejectedResult = results.find(result => result.status === 'rejected');
if (rejectedResult) {
this.logger.error(`Failed to clean up some files: ${rejectedResult.reason}`);
cleanupBusboy();
if (!slient) {
// TODO: check if the reason can be thrown
throw rejectedResult.reason; // this.mapBucketErrorToHttpError(rejectedResult.reason);
}
}
this.logger.info('uploaded files clean up success');
};
// function to help clean up and abort processing files
const abortWithError = err => {
if (errorOccured) return;
errorOccured = true;
// Abort all ongoing file uploads
abortControllers.forEach(controller => controller.abort('Operation Aborted!'));
pendingWrites.onceZero(async () => {
cleanupBusboy();
try {
console.time('Clean up all files done in');
await cleanupUploadedFiles();
console.timeEnd('Clean up all files done in');
} catch (cleanupErr) {
if (cleanupErr.name === 'AbortError') {
this.logger.info('operation aborted');
} else {
this.logger.error('Delete object error', cleanupErr);
return next(cleanupErr); // this.mapBucketErrorToHttpError(cleanupErr);
}
}
next(err);
});
};
function cleanupBusboy() {
req.unpipe(busboy);
busboy.removeAllListeners();
}
/**
* helper function to call next and clean up when upload is complete without errors
* @param {*} err
*/
function handleDone(err) {
if (isDone) return;
isDone = true;
cleanupBusboy();
next(err);
}
/**
* helper function to handle complete the process when everything is done
*/
function completeProcess() {
// check if the process completed successfully without errors and no files is pending
if (readFinished && pendingWrites.isZero() && !errorOccured) {
handleDone();
}
}
// >>>>>>> Client Disconnect Handling <<<<<<<
onFinished(res, () => {
if (!isDone) {
this.logger.warn('Client disconnected, aborting remaining uploads');
abortWithError(new ErrorHandler('CLIENT_ABORTED'));
}
console.log('Client disconnected, no need to abort');
});
// >>>>>>>>>>>>>>>>>> handling busboy events <<<<<<<<<<<<<<<<<<<
busboy.on('file', (fieldname, fileStream, info) => {
const { filename, encoding, mimeType } = info;
if (!fieldname || !filename) return fileStream.resume(); // Skip if no file
/**
* init the file object to pass the file info to next function
* @type {file}
*/
const file = { fieldname, originalname: filename, encoding, mimetype: mimeType };
// Pause the file stream before processing to prevent uncontrolled flow
fileStream.pause();
// handle filter files before start uploading to bucket
filesFilterWrapper(req, file, (filterErr, allow) => {
// always make error messages on top to avoid sening user to next handler when there's a validation error occured
if (filterErr) return abortWithError(filterErr);
if (!allow) return fileStream.resume(); // Skip disallowed files
// create a file unique name to avoid files overwrite / names conflict
file.filename = this.filenameGenerator(filename);
// save the file name to uploadedFiles array to handle abort / cleanup when and error occurs or process aborted
uploadedFiles.push(file.filename);
// check if it's a single file to pass it to req.file or multiple files to pass to req.files instead
if (isSingleFile) {
req.file = file;
} else {
req.files = (req.files || []).concat([file]);
}
// create abortControllder to handle abort process and cleanup
const abortController = new AbortController();
const controllerIndex = abortControllers.length;
abortControllers.push(abortController);
// call counter to send signature there's a file started to upload
pendingWrites.increment();
// Resume file stream once checks are complete
fileStream.resume();
console.log({ ContentLength, ContentType: file.mimetype, ContentEncoding: file.encoding });
// let currentUploadId;
this.uploadUnknowLengthStreamObject(fileStream, file.filename, {
config: { signal: abortController.signal, tags: [{ Key: file.filename, Value: file.originalname }] },
params: {
ContentLength: ContentLength > this.BytesConverter.MBToBytes(5) ? ContentLength : undefined, // allow to upload files less than 5MB by setting ContentLength as undefined
ContentEncoding: file.encoding,
ContentType: file.mimetype || 'application/octet-stream',
},
})
// this.uploadObject(fileStream, file.filename, {
// sendOptions: { abortSignal: abortController.signal },
// paramsOptions: {
// /* ContentLength, */
// ContentEncoding: file.encoding,
// ContentType: file.mimetype || 'application/octet-stream',
// Tagging: `${file.filename}=${file.originalname}`,
// },
// })
.then(result => {
this.logger.log(JSON.stringify(result, null, 2), '............. result .......................');
this.logger.info(`File [${filename}] uploaded successfully`);
// call decrement to indicate there a file has been uploaded
pendingWrites.decrement();
abortControllers.splice(controllerIndex, 1); // remove the controller from abortControllers array when successfully uploaded
// call complete process to go next when zero files is pending
completeProcess();
// handle case where uploaded file size is smaller than 5MB to remove it after upload done
if (errorOccured) {
cleanupUploadedFiles();
}
})
.catch(err => {
// call decrement to indicate none any files hanging.
pendingWrites.decrement();
abortControllers.splice(controllerIndex, 1); // remove the controller from abortControllers array when done.
if (err.name === 'AbortError') {
this.logger.info('operation aborted');
} else {
this.logger.error('File upload error', err);
// handle abort error
abortWithError(err);
}
});
fileStream.on('error', err => {
// remove the pending file when an error occured
pendingWrites.decrement();
abortWithError(err);
});
fileStream.on('limit', () => {
// remove the pending file limit exceeded
pendingWrites.decrement();
abortWithError(
new ErrorHandler(
options.limits.fileSize
? { code: 'LIMIT_FILE_SIZE', seperate: `max size allowed is: ${this.BytesConverter.BytesToKB(options.limits.fileSize)}KB` }
: 'LIMIT_FILE_SIZE',
fieldname,
),
);
});
fileStream.on('close', () => {
// completeFileUpload(); // Local file stream finished
this.logger.info(`Finished processing file ${filename}`);
});
});
});
// handle parse formData and complex formData syntax like (pets[0][name] = value) then append it to req.body
busboy.on('field', (name, value, info) => {
if (!name) return abortWithError(new ErrorHandler('MISSING_FIELD_NAME'));
const { nameTruncated, valueTruncated } = info;
if (nameTruncated) return abortWithError(new ErrorHandler('LIMIT_FIELD_KEY'));
if (valueTruncated) return abortWithError(new ErrorHandler('LIMIT_FIELD_VALUE', name));
appendField(req.body, name, value);
});
// handling busboy events
busboy.on('error', err => {
busboy.removeAllListeners();
abortWithError(err);
});
busboy.on('partsLimit', () => {
abortWithError(new ErrorHandler('LIMIT_PART_COUNT'));
});
busboy.on('filesLimit', () => {
abortWithError(new ErrorHandler('LIMIT_FILE_COUNT'));
});
busboy.on('fieldsLimit', () => {
abortWithError(new ErrorHandler('LIMIT_FIELD_COUNT'));
});
busboy.on('close', () => {
this.logger.info('busboy closed!');
});
busboy.on('finish', () => {
readFinished = true;
completeProcess();
});
req.pipe(busboy);
};
}
BytesConverter = {
/**
* convert MB number to bytes
* @param {number} n
* @returns {number}
*/
MBToBytes: n => n * (1024 ** 2),
/**
* convert bytes number to KB
* @param {number} n
* @returns {number}
*/
BytesToKB: n => n / (1024 ** 1),
/**
* convert KB number to bytes
* @param {number} n
* @returns {number}
*/
KBToBytes: n => n * (1024 ** 1),
/**
* convert bytes number to MB
* @param {number} n
* @returns {number}
*/
BytesToMB: bytes => bytes / (1024 ** 2),
};
/**
* check if content type header is form data
* @param {string} str
* @returns {boolean}
*/
isFormData = str => str.startsWith('multipart/form-data');
/**
* create a file uniqueName
* @param {string} originalname
* @returns {string}
*/
generateUniqueFileName(originalname) {
const filename = uuidv4();
const timestamp = Date.now();
const name = originalname ? Buffer.from(originalname, 'latin1').toString('utf8') : 'file.png';
return `${filename}--${timestamp}${path.extname(name)}`;
}
/**
* Function to map bucket delete files errors to HTTP errors
* @param {import('@aws-sdk/client-s3').S3ServiceException} err
*/
mapBucketErrorToHttpError(err) {
// If the bucket error has a specific status code or type, map it to HTTP errors
if (err.$metadata.httpStatusCode) {
// You can adjust the mappings based on the bucket service's status codes and error structure
switch (err.statusCode) {
case 401:
return createHttpError(401, 'UnAuthorized: Access is denied');
case 403:
return createHttpError(403, 'Forbidden: Access is denied');
case 404:
return createHttpError(404, 'Not Found: The requested file does not exist');
case 500:
return createHttpError(500, 'Internal Server Error: service failure');
default:
return createHttpError(err.$metadata.httpStatusCode, err.message || 'operation failed');
}
}
// For other types of errors or unknown status codes, return a generic 500 error
return createHttpError(500, err.message || 'An unknown error occurred during the bucket operation');
}
/**
* delete file from bucket
* @param {string} fileName
*/
deleteObject = async fileName => {
try {
const params = {
Bucket: this.BUCKET_NAME,
Key: fileName,
};
return this.s3.send(new DeleteObjectCommand(params));
} catch (err) {
this.logger.error('Error Delete Object', err);
throw err;
}
};
/**
* stream upload unknown length file to bucket
* @param {import('@aws-sdk/client-s3').PutObjectCommand['input']['Body']} fileStream
* @param {string} fileName
* @param {{config?: import('@aws-sdk/lib-storage').Configuration & { signal: AbortSignal }, params?: Omit<import('@aws-sdk/lib-storage').Options['params'], 'Body' | 'Key' | 'Bucket'> }} options
*/
uploadUnknowLengthStreamObject = (fileStream, fileName, options = {}) => {
try {
/**
* @type {import('@aws-sdk/lib-storage').Options['params']}
*/
const params = {
Bucket: this.BUCKET_NAME,
Key: fileName,
Body: fileStream,
ACL: 'public-read', // make file publicly accessible
...(options.params || {}),
};
const upload = new Upload({
client: this.s3, // Your S3 client instance
params,
queueSize: 2, // Parallelism
// partSize: this.BytesConverter.KBToBytes(50), // 50KB part size to reduce chunk size
...(options.config || {}),
});
if (options.config?.signal) {
options.config.signal.onabort = function () {
console.log('............ upload aborted ..............');
upload.abort();
};
}
upload.on('httpUploadProgress', progress => {
console.log(`Uploaded ${progress.loaded} of ${progress.total} bytes`);
});
return upload.done();
} catch (err) {
this.logger.error('Error Uploading Files', err);
throw err;
}
};
/**
* @typedef {Parameters<typeof this.s3.send>[1]} sendOptions
* @param {import('@aws-sdk/client-s3').PutObjectCommand['input']['Body']} fileStream
* @param {string} fileName
* @param {{
* sendOptions: sendOptions,
* paramsOptions: Omit<PutObjectCommand['input'], 'Bucket' | 'Key' | Body>
* }} options
*/
uploadObject = async (fileStream, fileName, options) => {
try {
/**
* @type {PutObjectCommand['input']}
*/
const params = {
Bucket: this.BUCKET_NAME,
Key: fileName,
Body: fileStream,
...(options?.paramsOptions || {}),
ACL: 'public-read', // make file publicly accessible
};
return this.s3.send(new PutObjectCommand(params), options?.sendOptions);
} catch (err) {
this.logger.error(err);
throw err;
}
};
};
const { s3 } = require('.');
const { BUCKET_NAME } = require('../config');
const S3UploadMiddleware = require('../lib/S3UploadMiddleware');
const s3Upload = new S3UploadMiddleware({
logger: console,
S3Client: s3,
BUCKET_NAME,
});
const imagesWhiteList = ['image/png', 'image/jpeg', 'image/jpg', 'image/webp'];
const imageUpload = s3Upload.createMiddleware({
limits: { fileSize: s3Upload.BytesConverter.MBToBytes(10) },
fields: [{ name: 'image', maxCount: 1 }],
fileFilter(req, file, cb) {
// Accept images only from white list
if (imagesWhiteList.indexOf(file.mimetype) > -1) {
cb(null, true); // Accept the file
} else {
cb(new Error(req.t('INVALID_MIME_TYPE', { types: imagesWhiteList.map(e => e.split('/')[1]).join(', ') })), false); // Reject the file
}
},
});
- I hope if someone can help me fix it thanks