Here is my code in Node.JS
const fs = require('fs');
const readline = require('readline');
const filePath = 'the/path/to/the/file';
const linesPerChunk = 20;
const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
const rl = readline.createInterface({
input: readStream,
crlfDelay: Infinity
});
let lines = [];
rl.on('line', async (line) => {
lines.push(JSON.parse(line));
if (lines.length === linesPerChunk) {
await processLines(lines);
lines = [];
}
});
rl.on('close', async () => {
// Process any remaining lines (less than chunk size)
if (lines.length > 0) {
await processLines(lines);
}
});
async function processLines(lines) {
console.log("process lines");
// Do something with the lines (e.g., print them)
try {
const returnData = await someaction();
console.log(returnData);
console.log('Done for a chunk.');
return returnData;
} catch (error) {
console.log(error);
}
}
The first pass goes well but on the line lines = [];
the whole file is loaded in the lines
variable as if the system had kept processing new lines even though I’m supposed to await
the result of the function before processing more lines.
It then immediately goes into the close
event and tries to process the whole file at once (hence trying to reprocess the chunk I’ve already processed in addition to all the other ones).
What am I missing here ?
I would like each chunk to be processed one by one in a sequential manner (or in parallel in fact but guaranteeing that no single line gets processed several times).
2
Answers
You are using streams and await only defer processing for any async operation until it is completed. You need to pause and unpause the streams while you process it. This is one way of doing it.
Another solution could be to use transform stream if you want to do some processing for incoming line.
By default readline interface somehow not support the pause on it, you can check here using a TP library.
"By applying this code,the ‘someAction’ function returns a promise, and the ‘processLines’ function awaits the completion of this promise before moving on to the next chunk. This should help ensure that each chunk is processed sequentially "