skip to Main Content

Here is my code in Node.JS

  const fs = require('fs');
  const readline = require('readline');
  const filePath = 'the/path/to/the/file';
  const linesPerChunk = 20;
  const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
  const rl = readline.createInterface({
    input: readStream,
    crlfDelay: Infinity
  });

  let lines = [];

  rl.on('line', async (line) => {
    lines.push(JSON.parse(line));

    if (lines.length === linesPerChunk) {
      await processLines(lines);
      lines = [];
    }
  });

  rl.on('close', async () => {
    // Process any remaining lines (less than chunk size)
    if (lines.length > 0) {
      await processLines(lines);
    }
  });

  async function processLines(lines) {
    console.log("process lines");
    // Do something with the lines (e.g., print them)
    try {
      const returnData = await someaction();
      console.log(returnData);
      console.log('Done for a chunk.');
      return returnData;
    } catch (error) {
      console.log(error);
    }
  }

The first pass goes well but on the line lines = []; the whole file is loaded in the lines variable as if the system had kept processing new lines even though I’m supposed to await the result of the function before processing more lines.

It then immediately goes into the close event and tries to process the whole file at once (hence trying to reprocess the chunk I’ve already processed in addition to all the other ones).

What am I missing here ?
I would like each chunk to be processed one by one in a sequential manner (or in parallel in fact but guaranteeing that no single line gets processed several times).

2

Answers


  1. The first pass goes well but on the line lines = []; the whole file is
    loaded in the lines variable as if the system had kept processing new
    lines even though I’m supposed to await the result of the function
    before processing more lines.

    You are using streams and await only defer processing for any async operation until it is completed. You need to pause and unpause the streams while you process it. This is one way of doing it.

    Another solution could be to use transform stream if you want to do some processing for incoming line.

    By default readline interface somehow not support the pause on it, you can check here using a TP library.

    Login or Signup to reply.
  2. "By applying this code,the ‘someAction’ function returns a promise, and the ‘processLines’ function awaits the completion of this promise before moving on to the next chunk. This should help ensure that each chunk is processed sequentially "

    const fs = require('fs');
    const readline = require('readline');
    const filePath = 'the/path/to/the/file';
    const linesPerChunk = 20;
    const readStream = fs.createReadStream(filePath, { encoding: 'utf8' });
    const rl = readline.createInterface({
      input: readStream,
      crlfDelay: Infinity
    });
    
    let lines = [];
    
    rl.on('line', async (line) => {
      lines.push(JSON.parse(line));
    
      if (lines.length === linesPerChunk) {
        await processLines(lines);
        lines = [];
      }
    });
    
    rl.on('close', async () => {'
      if (lines.length > 0) {
        await processLines(lines);
      }
    });
    
    async function processLines(lines) {
      console.log("process lines");'
      try {
        const returnData = await someAction(lines);
        console.log(returnData);
        console.log('Done for a chunk.');
        return returnData;
      } catch (error) {
        console.log(error);
      }
    }
    
    function someAction(lines) {
      return new Promise((resolve, reject) => {'
        setTimeout(() => {
          console.log("Simulating some asynchronous action");
          resolve('Result of the asynchronous action');
        }, 1000);
      });
    }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search