I have two functions, scheduleScan()
and scan()
.
scan()
calls scheduleScan()
when there’s nothing else to do except scheduling a new scan, so scheduleScan()
can schedule a scan()
. But there’s a problem, some jobs run twice.
I want to make sure that only one job is being processed at any given time. How can I achieve that? I believe it has something to do with done()
, (it was in scan(), removed now) but I couldn’t come up with a solution.
Bull version: 3.12.1
Important late edit: scan()
calls another functions and they may or may not call other functions, but they’re all sync functions, so they only call a function when their own jobs are completed, there is only one way forward. At the end of the “tree”, I call it, the last function calls scheduleScan(), but there can’t be two simultaneous jobs running. Every single job starts at scan()
, by the way, and they end with scheduleScan(stock, period, milliseconds, 'called by file.js')
export function update(job) {
// does some calculations, then it may call scheduleScan() or
// it may call another function, and that could be the one calling
// scheduleScan() function.
// For instance, a function like finalize()
}
export function scan(job) {
update(job)
}
import moment from 'moment'
import stringHash from 'string-hash'
const opts = { redis: { port: 6379, host: '127.0.0.1', password: mypassword' } }
let queue = new Queue('scan', opts)
queue.process(1, (job) => {
job.progress(100).then(() => {
scan(job)
})
})
export function scheduleScan (stock, period, milliseconds, triggeredBy) {
let uniqueId = stringHash(stock + ':' + period)
queue.getJob(uniqueId).then(job => {
if (!job) {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
} else {
job.getState().then(state => {
if (state === 'completed') {
job.remove().then(() => {
if (milliseconds) {
queue.add({ stock, period, triggeredBy }, { delay: milliseconds, jobId: uniqueId }).then(() => {
// console.log('Added with ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
} else {
queue.add({ stock, period, triggeredBy }, { jobId: uniqueId }).then(() => {
// console.log('Added without ms: ' + stock + ' ' + period)
}).catch(err => {
if (err) {
console.log('Can not add because it exists ' + new Date())
}
})
}
}).catch(err => {
if (err) {
// console.log(err)
}
})
}
}).catch(err => {
// console.log(err)
})
}
})
}
2
Answers
The problem, I believe is your
scan
function is async. So yourjob.progress
function callsscan
and then immediately callsdone
allowing the queue to process another job.A solution could be to pass the
done
callback as a parameter to yourscan
andscheduleScan
functions, and invoke it, once you have completed your job (or on error).Another (better) solution could be to ensure that you always return a
Promise
fromscan
andscheduleScan
, then await the promise to resolve and then calldone
. If doing this, make sure you chain all your promise returns in yourscheduleScan
function.The scan function is an asynchronous function. In you
queue.process()
function you have to await the scan function and then call thedone()
callback.Try this! I’ve tried to refactor the code a bit by using async-await.