skip to Main Content

How can I manage lengthy task functions in Node.js Express.js to manage lengthy task functions effectively, and prevent timeout errors? Presently, my application contains a time-consuming function, the response of which I don’t require immediately, but I still want it to execute its tasks. How can I ensure that I receive the API response while allowing the function to execute in the background?

Why i tried: (I’m using typescript)

import fs from 'fs'
import { v4 as uuidv4 } from 'uuid';
import { Worker } from 'node:worker_threads';

api.ts
router.post('/run-long-task', async (req, res) => {
    const taskId = uuidv4();

    const worker = new Worker(file, {
        workerData: { taskId },
        eval: true
    });

    tasksById.set(taskId, {
        status: 'running',
        started: Date.now(),
    });

    worker.on('message', async () => {
        try {
            tasksById.set(taskId, { status: 'completed' });
        } catch (error) {
            tasksById.set(taskId, { status: 'error', error: error.message });
        }
    });

    tasksById.set(taskId, {
        status: "running",
        started: Date.now(),
        worker,
    });

    worker.on("message", (status) => {
        tasksById.set(taskId, { status });
    });

    worker.on("error", (error) => {
        tasksById.set(taskId, { status: "error", error: error.message });
    });

    worker.on("exit", (code) => {
        if (code !== 0) {
            if (tasksById.get(taskId)?.status === "running") {
                tasksById.set(taskId, {
                    status: "error",
                    error: `Worker stopped with exit code ${code}`,
                });
            } else {

            }
        }
    });
    res.json({ taskId });
})

worker.ts:
import { parentPort } from "node:worker_threads";

async function performTask() {
    // Long Task Function 
 }

if (parentPort) {
    parentPort.on('message', (task) => {
        performTask()
        parentPort?.postMessage({ success: '1' });
    });
}

2

Answers


  1. If the task takes so long that your browser times out the request while it’s running, you don’t want to run it on the main thread at all, whether before or after you send your response to that API call — because the main thread will be tied up and unable to process any subsequent calls until it’s finished.

    Instead, run the task in a worker thread. Worker threads run independently of the main thread. You can use messaging to communicate between the main thread and the worker thread (for instance, so the worker knows what to start doing, and later the main thread knows the task is complete).

    Here’s a very rough example, see comments:

    // Something to keep track of active tasks.
    // Just using a Map like this is probably NOT a good idea, at least not without
    // code that cleans out old tasks periodically. But my goal here is to give
    // you *some* structure you can refine.
    const tasksById = new Map();
    
    // An endpoint that starts the task
    app.post("/api/start-task", (req, res) => {
        // Get a unique ID for this task
        const taskId = uuid();
    
        // Run the worker
        const worker = new Worker("./worker.js", {
            workerData: {
                // ...here you could use messaging to send details of what needs to be done to the worker...
            },
        });
    
        // Remember this task in the map
        tasksById.set(taskId, {
            status: "running",
            started: Date.now(), // You could use this to clear out old entries with a periodic process.
            worker, // You could use this to terminate the worker when removing a task.
        });
    
        // Accept status updates from the worker
        worker.on("message", (status) => {
            tasksById.set(taskId, { status });
        });
    
        // Handle errors from the worker
        worker.on("error", (error) => {
            tasksById.set(taskId, { status: "error", error: error.message });
        });
    
        // If the worker exits with a non-zero exit code and the task
        // is "running", flag it as having failed
        worker.on("exit", (code) => {
            if (code !== 0) {
                if (tasksById.get(taskId)?.status === "running") {
                    tasksById.set(taskId, {
                        status: "error",
                        error: `Worker stopped with exit code ${code}`,
                    });
                } else {
                    // ...probably worth doing *something* to report the error...
                }
            }
        });
    
        // Reply with the task ID
        res.json({ taskId });
    });
    
    // An endpoint the caller can use to get the status of a task.
    // NOTE: This requies polling, which is usually a bad idea.
    // Ideally, use a web socket or something similar. But doing an
    // example of that is really out of scope.
    app.get("/task-status/:taskId", (req, res) => {
        // Get the task ID from the request, get the status, report back
        const { taskId } = req.params;
        const task = tasksById.get(taskId);
        const status = task?.status ?? "not found";
        res.json({ taskId, status });
    });
    
    // An endpoint the caller can use to explicitly clean up a task it no longer needs.
    app.get("/remove-task/:taskId", (req, res) => {
        const { taskId } = req.params;
        const task = tasksById.get(taskId);
        let status;
        if (task) {
            tasksById.delete(taskId);
            task.worker.terminate();
            status = "removed";
        } else {
            status = "not found";
        }
        res.json({ taskId, status });
    });
    

    Again, that’s very rough.

    That’s just one approach, though. Another approach would be to start the worker on startup but have it sit there idle, then use messaging to tell it to process something.

    Login or Signup to reply.
  2. With example of T.J. Crowder and this example try tu put your long function inside worker.ts parentPort.on().

    index.ts:

    const fs = require('fs');
    var file = fs.readFileSync("./worker.ts",{ encoding: 'utf8', flag: 'r' });
    const { Worker} = require('node:worker_threads');
    
    function runService( worker) {
       return new Promise((resolve, reject) => {
        worker.on('message', resolve);
        worker.on('error', reject);
        worker.on('exit', (code) => {
          if (code !== 0)
            reject(new Error(`Worker stopped with exit code ${code}`));
        })
        worker.postMessage({toworker:"run"});
      })
    
    }
    
    async function run() {
        console.log(file)
        const worker = new Worker(file, {
            eval: true
        });
        const result = await runService(worker)
        console.log(result);
    }
    
    run().catch(err => console.error(err))
    

    worker.ts:

    const { parentPort } = require('node:worker_threads');
    parentPort.on('message', (task) => {
        parentPort.postMessage( {success:'1'});
    });
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search