skip to Main Content

I am creating node bull queue and passing a dynamic name as an option to the Queue.add function

myQueue.add(`myJob-${val}`, {
    attempts: 3,
    removeOnFail: true
});

I am defining the process name as below for the above job

myQueue.process(`myJob-${val}`, async (job, callback) => {
    try {
        console.log('Processing job', job.id, job.data);
        callback();

    } catch (err) {
        console.log(err);
    }
});

However, I am getting below error

Job ID 1 failed Error: Missing process handler for job type myJob-123

How to correctly define the processor with a dynamic name value?

2

Answers


  1. Shouldn’t your 2nd argument to .add() be the Job data? The job options you are passing in as the 2nd argument are correct, but they should be third.

    Possibly try this instead:

    myQueue.add(`myJob-${val}`, { lorem: "ipsum" }, {
        attempts: 3,
        removeOnFail: true
    });
    

    In cases where I’ve received that same error of Missing process handler for job ___ it has been because I have added the job before the processor is running. When that issue is unavoidable increasing the attempts (like you did) to a number greater than 1 has fixed it. However, you’re passing in those job options as the data, so the attempts won’t affect your queue.

    Login or Signup to reply.
  2. The mistake you are making is passing extra fields in the add method.
    Remove the name value from add method everything will work
    only two parameters you have to pass while adding a job::–> data and options

    myQueue.add({dummydatakey:"dummydataval"}, {attempts: 3,removeOnFail: true});
    

    A sample job handler file for your Ref:

    const Queue = require('bull'); 
    
    module.exports = {
    
    jobStatusCheckScheduler: async function () {
        console.log('hi')
        // 1. Initiating the Queue
        const statusCheckQueue = new Queue("JobStatusCheck", {
            redis: {
                host: "127.0.0.1",
                port: 6379,
            },
        });
    
        const options = {            
            attempts: 3,
            removeOnFail: true,
            repeat: {
                every: 10000,
                limit: 5,
            },
        };
    
        // 2. Adding function in the job
        statusCheckQueue.process(async (job, callback) => {
            try {
                console.log('Processing job', job.id, job.data);
                callback();
    
            } catch (err) {
                console.log(err);
            }
        }).then(() => {
            console.log('suresh')
        }).catch((err) => {
            console.log(err)
        })
    
        // 3. Adding a Job to the Queue
        await statusCheckQueue.add( {user: '1'}, options).then((job) => {
            console.log('suresh first', job.id)
        }).catch((err) => {
            console.log(err)
        })
    
        // 4. Listener
        statusCheckQueue.on("error", (err) => {
            console.log(`Job error ${err}`);
        });
    
        statusCheckQueue.on("progress", function (job, progress) {
            // A job's progress was updated!
        });
    
        statusCheckQueue.on("completed", (job, result) => {
            console.log("Job completed", job.data);
        });
    
        statusCheckQueue.on("failed", function (job, err) {
            // A job failed with reason `err`!
            console.log(`Job not completed failed ${err}`);
        });
    }
    

    };

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search