skip to Main Content

In my Node-Express application, I need to schedule the jobs (connect with LinkedIn profiles) based on the user’s selection of start time-date and end time-date from the calendar available on UI. The job is scheduled to execute on few selected batch of data until the date-time requirements are matched.

I am using npm package Bull to handle the job queues and scheduling but it doesn’t seem to work. The job (to connect with LinkedIn profiles) or say simple console (for testing purpose), is executed immediately as soon as the timer is set or a new job is created from the UI rather than executing at the start time defined.

Here is my code:

const Queue = require("bull");
const mongoose = require("mongoose");
const schedulerJobs = require("../api/models/schedulerJobs");

var id = mongoose.Types.ObjectId();
var HTMLParser = require("node-html-parser");
var { fetchSendConnectionData } = require("./services");
var { setLeadName } = require("./utils");

module.exports = async function connectionScheduler(
  userId,
  cookieName,
  leads,
  start,
  end,
  campaign_name,
  connection_message,
  campaign_id
) {
  var i = 0;
  var parse_message = HTMLParser.parse(
    connection_message
  ).structuredText.toString();

  // 1. Initiating the Queue
  const campaignQueue = new Queue("Campaign", {
    redis: {
      host: "127.0.0.1",
      port: 6379,
    },
  });

  const data = leads;

  const options = {
    attempts: 2,
    delay: 5000,
    repeat: {
      cron: "*/2 * * * * *",
      tz: "America/Los_Angeles",
      startDate: start,
      endDate: end,
    },
  };

  // 2. Adding a Job to the Queue
  campaignQueue.add(`${campaign_name}`, data, options);

  // 3. Consumer
  campaignQueue.process(async (job) => {
    campaignQueue.close();

    return await console.log("CRON started.....");

    console.log(
      `Connection Cron For ${campaign_name} Now Points To Index => ${i}`
    );
    fetchSendConnectionData({
      name: cookieName,
      profile_links: job[i].profileUrl,
      message: setLeadName(parse_message, job[i].name),
    })
      .then(({ data: { message } }) => {
        if (message === "Connection send") {
          console.log(
            `Its Been 5 Minutes And Connection Request Is Sent To => ${job[i].name}`
          );
        } else {
          console.log(
            `Connection Request To => ${job[i].name} Has Failed So Now We Move On To The Next One`
          );
        }
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      })
      .catch((err) => {
        console.log(
          `Connection Request To => ${job[i].name} Has Failed Due To => ${err.message}`
        );
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      });
  });

  // 4. Listener
  campaignQueue.on("error", (err) => {
    // console.log(`Job completed with result ${result}`);
  });

  campaignQueue.on("progress", function (job, progress) {
    // A job's progress was updated!
  });

  campaignQueue.on("completed", (job, result) => {
    // console.log("job completed", job);

    //save job completed to database
    const jobdetail = {
      userId,
      start,
      end,
      campaign_id,
      campaign_name,
      connection_message,
      jobId: job.opts.jobId,
    };

    const schedulerjobs = new schedulerJobs(jobdetail);
    schedulerjobs.save().then((scheduledjob) => console.log("Job saved to db"));
  });

  campaignQueue.on("failed", function (job, err) {
    // A job failed with reason `err`!
  });
};

As soon as the server is started, it immediately prints the output:

CRON started.....
Job saved to db

It does not wait for start and only runs a single time not keeping it running until end.

Kindly help to resolve this

4

Answers


  1. Any line of code after return will be ignored.
    Try this:

        // return await console.log("CRON started.....");
        await console.log("CRON started.....");
    

    Put an empty return in the end of function if needed.

    Login or Signup to reply.
  2. you can use the repeat feature of bull

    queue.add({your_data:""},{repeat:{cron:"5 * * * *"}});
    

    the above code will run the job every 5 minute.

    https://optimalbits.github.io/bull/

    see repeatable section of above documentation.

    Login or Signup to reply.
  3. for repeating jobs every 5 minutes you should use following code:

    const myJob = await myqueue.add(
      { foo: 'bar' },
      {
        repeat: {
          every: 5*60*1000,
        }
      }
    );
    
    Login or Signup to reply.
  4. The mistake on your code is fixed check the below solution:

    const Queue = require('bull');
    
    module.exports = {
    
    jobStatusCheckScheduler: async function () {
        console.log('hi')
        // 1. Initiating the Queue
        const statusCheckQueue = new Queue("JobStatusCheck", {
            redis: {
                host: "127.0.0.1",
                port: 6379,
            },
        });
    
        const options = {
            removeOnFail: true,
            attempts: 3,
            repeat: {
                every: 10000,
                limit: 5,
            },
        };
    
        // 2. Adding function in the job
        statusCheckQueue.process(async (job, callback) => {
            try {
                console.log('Processing job', job.id, job.data);
                callback();
    
            } catch (err) {
                console.log(err);
            }
        }).then(() => {
            console.log('suresh')
        }).catch((err) => {
            console.log(err)
        })
    
        // 3. Adding a Job to the Queue
        await statusCheckQueue.add( {user: '1'}, options).then((job) => {
            console.log('suresh first', job.id)
        }).catch((err) => {
            console.log(err)
        })
    
        // 4. Listener
        statusCheckQueue.on("error", (err) => {
            console.log(`Job error ${err}`);
        });
    
        statusCheckQueue.on("progress", function (job, progress) {
            // A job's progress was updated!
        });
    
        statusCheckQueue.on("completed", (job, result) => {
            console.log("Job completed", job.data);
        });
    
        statusCheckQueue.on("failed", function (job, err) {
            // A job failed with reason `err`!
            console.log(`Job not completed failed ${err}`);
        });
    }
    

    };

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search