skip to Main Content

Basically, each of the clients —that have a clientId associated with them— can push messages and it is important that a second message from the same client isn’t processed until the first one is finished processing (Even though the client can send multiple messages in a row, and they are ordered, and multiple clients sending messages should ideally not interfere with each other). And, importantly, a job shouldn’t be processed twice.

I thought that using Redis I might be able to fix this issue, I started with some quick prototyping using the bull library, but I am clearly not doing it well, I was hoping someone would know how to proceed.

This is what I tried so far:

  1. Create jobs and add them to the same queue name for one process, using the clientId as the job name.
  2. Consume jobs while waiting large random amounts of random time on 2 separate process.
  3. I tried adding the default locking provided by the library that I am using (bull) but it locks on the jobId, which is unique for each job, not on the clientId .

What I would want to happen:

  • One of the consumers can’t take the job from the same clientId until the previous one is finished processing it.
  • They should be able to, however, get items from different clientIds in parallel without problem (asynchronously). (I haven’t gotten this far, I am right now simply dealing with only one clientId)

What I get:

  • Both consumers consume as many items as they can from the queue without waiting for the previous item for the clientId to be completed.

Is Redis even the right tool for this job?

Example code

// ./setup.ts
import Queue from 'bull';
import * as uuid from 'uuid';

// Check that when a message is taken from a place, no other message is taken

// TO do that test, have two processes that process messages and one that sets messages, and make the job take a long time

// queue for each room https://stackoverflow.com/questions/54178462/how-does-redis-pubsub-subscribe-mechanism-works/54243792#54243792
// https://groups.google.com/forum/#!topic/redis-db/R09u__3Jzfk

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353

export async function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}
export interface JobData {
  id: string;
  v: number;
}
export const queue = new Queue<JobData>('messages', 'redis://127.0.0.1:6379');

queue.on('error', (err) => {
  console.error('Uncaught error on queue.', err);
  process.exit(1);
});

export function clientId(): string {
  return uuid.v4();
}

export function randomWait(minms: number, maxms: number): Promise<void> {
  const ms = Math.random() * (maxms - minms) + minms;
  return sleep(ms);
}

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
queue.LOCK_RENEW_TIME = 5 * 60 * 1000;

// ./create.ts
import { queue, randomWait } from './setup';

const MIN_WAIT = 300;
const MAX_WAIT = 1500;
async function createJobs(n = 10): Promise<void> {
  await randomWait(MIN_WAIT, MAX_WAIT);
  // always same Id
  const clientId = Math.random() > 1 ? 'zero' : 'one';
  for (let index = 0; index < n; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    const job = { id: clientId, v: index };
    await queue.add(clientId, job).catch(console.error);
    console.log('Added job', job);
  }
}

export async function create(nIds = 10, nItems = 10): Promise<void> {
  const jobs = [];
  await randomWait(MIN_WAIT, MAX_WAIT);
  for (let index = 0; index < nIds; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    jobs.push(createJobs(nItems));
    await randomWait(MIN_WAIT, MAX_WAIT);
  }
  await randomWait(MIN_WAIT, MAX_WAIT);
  await Promise.all(jobs)
  process.exit();
}

(function mainCreate(): void {
  create().catch((err) => {
    console.error(err);
    process.exit(1);
  });
})();

// ./consume.ts
import { queue, randomWait, clientId } from './setup';

function startProcessor(minWait = 5000, maxWait = 10000): void {
  queue
    .process('*', 100, async (job) => {
      console.log('LOCKING: ', job.lockKey());
      await job.takeLock();
      const name = job.name;
      const processingId = clientId().split('-', 1)[0];
      try {
        console.log('START: ', processingId, 'tjobName:', name);
        await randomWait(minWait, maxWait);
        const data = job.data;
        console.log('PROCESSING: ', processingId, 'tjobName:', name, 'tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('PROCESSED: ', processingId, 'tjobName:', name, 'tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('FINISHED: ', processingId, 'tjobName:', name, 'tdata:', data);
      } catch (err) {
        console.error(err);
      } finally {
        await job.releaseLock();
      }
    })
    .catch(console.error); // Catches initialization
}

startProcessor();

This is run using 3 different processes, which you might call like this (Although I use different tabs for a clearer view of what is happening)

npx ts-node consume.ts & 
npx ts-node consume.ts &
npx ts-node create.ts &

2

Answers


  1. I’m not familir with node.js. But for Redis, I would try this,

    Let’s say you have client_1, client_2, they are all publisher of events.
    You have three machines, consumer_1,consumer_2, consumer_3.

    1. Establish a list of tasks in redis, eg, JOB_LIST.
    2. Clients put(LPUSH) jobs into this JOB_LIST, in a specific form, like "CLIENT_1:[jobcontent]", "CLIENT_2:[jobcontent]"
    3. Each consumer takes out jobs blockingly (RPOP command of Redis) and process them.
      For example, consumer_1 takes out a job, content is CLIENT_1:[jobcontent]. It parses the content and recognize it’s from CLIENT_1. Then it wants to check if some other consumer is processing CLIENT_1 already, if not, it will lock the key to indicate that it’s processing CLIENT_1.

    It goes on to set a key of "CLIENT_1_PROCESSING" , with content as "consumer_1", using the Redis SETNX command (set if the key not exists), with an appropriate timeout. For example, the task norally takes one minute to finish, you set a timeout of the key of five minutes, just in case consumer_1 crashes and holds on the lock indefinitely.

    If the SETNX returns 0, it means it fails to acquire the lock of CLIENT_1 (someone is already processing a job of client_1). Then it returns the job (a value of "CLIENT_1:[jobcontent]")to the left side of JOB_LIST, by using Redis LPUSH command.Then it might wait a bit (sleep a few seconds), and RPOP another task from the right side of the LIST. If this time SETNX returns 1, consumer_1 acquires the lock. It goes on to process job, after it finishes, it deletes the key of "CLIENT_1_PROCESSING", releasing the lock. Then it goes on to RPOP another job, and so on.

    Some things to consider:

    1. The JOB_LIST is not fair,eg, earlier jobs might be processed later
    2. The locking part is a bit rudimentary, but will suffice.

    ———-update————–

    I’ve figured another way to keep tasks in order.

    For each client(producer), build a list. Like "client_1_list", push jobs into the left side of the list.
    Save all the client names in a list "client_names_list", with values "client_1", "client_2", etc.

    For each consumer(processor), iterate the "client_names_list", for example, consumer_1 get a "client_1", check if the key of client_1 is locked(some one is processing a task of client_1 already), if not, right pop a value(job) from client_1_list and lock client_1. If client_1 is locked, (probably sleep one second) and iterate to the next client, "client_2", for example, and check the keys and so on.

    This way, each client(task producer)’s task is processed by their order of entering.

    Login or Signup to reply.
  2. EDIT: I found the problem regarding BullJS is starting jobs in parallel on one processor: We are using named jobs and where defining many named process functions on one queue/processor. The default concurrency factor for a queue/processor is 1. So the queue should not process any jobs in parallel.

    The problem with our mentioned setup is if you define many (named) process-handlers on one queue the concurrency is added up with each process-handler function: So if you define three named process-handlers you get a concurrency factor of 3 for given queue for all the defined named jobs.

    So just define one named job per queue for queues where parallel processing should not happen and all jobs should run sequentially one after the other.

    That could be important e.g. when pushing a high number of jobs onto the queue and the processing involves API calls that would give errors if handled in parallel.

    The following text is my first approach of answering the op’s question and describes just a workaround to the problem. So better just go with my edit 🙂 and configure your queues the right way.


    I found an easy solution to operators question.
    In fact BullJS is processing many jobs in parallel on one worker instance:
    Let’s say you have one worker instance up and running and push 10 jobs onto the queue than possibly that worker starts all processes in parallel.

    My research on BullJS-queues gave that this is not intended behavior: One worker (also called processor by BullJS) should only start a new job from the queue when its in idle state so not processing a former job.

    Nevertheless BullJS keeps starting jobs in parallel on one worker.

    In our implementation that lead to big problems during API calls that most likely are caused by t00 many API calls at a time. Tests gave that when only starting one worker the API calls finished just fine and gave status 200.

    So how to just process one job after the other once the previous is finished if BullJS does not do that for us (just what the op asked)?
    We first experimented with delays and other BullJS options but thats kind of workaround and not the exact solution to the problem we are looking for. At least we did not get it working to stop BullJS from processing more than one job at a time.

    So we did it ourself and started one job after the other.

    The solution was rather simple for our use case after looking into BullJS API reference (BullJS API Ref).

    We just used a for-loop to start the jobs one after another. The trick was to use BullJS’s

    job.finished
    

    method to get a Promise.resolve once the job is finished. By using await inside the for-loop the next job gets just started immediately after the job.finished Promise is awaited (resolved). Thats the nice thing with for-loops: Await works in it!

    Here a small code example on how to achieve the intended behavior:

    for (let i = 0; i < theValues.length; i++) {    
    
                    jobCounter++
    
                    const job = await this.processingQueue.add(
                        'update-values',
                        {
                            value: theValues[i],
                
                        },
                        {
                            // delay: i * 90000,
                            // lifo: true,
                        }
                    )
    
                    
                    this.jobs[job.id] = {
                        jobType: 'socket',
                        jobSocketId: BackgroundJobTasks.UPDATE_VALUES,
                        data: {
                            value: theValues[i],
                        },
                        jobCount: theValues.length,
                        jobNumber: jobCounter,
                        cumulatedJobId
                    }
    
                    await job.finished()
                        .then((val) => {
                            console.log('job finished:: ', val)
    
                        })
    
                
            }
    

    The important part is really

    await job.finished()
    

    inside the for loop. leasingValues.length jobs get started all just one after the other as intended.

    That way horizontally scaling jobs across more than one worker is not possible anymore. Nevertheless this workaround is okay for us at the moment.

    I will get in contact with optimalbits – the maker of BullJS to clear things out.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search