skip to Main Content

I am trying to send DTOs to my Redis queue with a Bull framework and handle these DTOs in processors. Sometimes job pass to processor (1 of 100) but most of the time failed with error: job stalled more than allowable limit and I have no idea how to fix it.

I give you a small intro and below you can see my code. I have created queue-api module which serve as wrapper for my queues, for instance order queue. This module then I import to modules from which I want publish DTO into queues, in my case order-module.

queue-api module files

// queue-api.module.ts
@Module({
  imports: [
    BullModule.registerQueue(
      {
        name: 'order-queue',
        defaultJobOptions: {
          backoff: 10000,
          attempts: Number.MAX_SAFE_INTEGER,
        },
      },
    ),
    ...
  ],
  providers: [OrderQueue],
  exports: [OrderQueue],
})
export class QueueApiModule {}

// order-queue.ts
@Injectable()
export class OrderQueue extends AbstractQueue {
  constructor(
    @InjectQueue('order-queue')
    private readonly queue: Queue,
  ) {}

  async sendSubmitMail(dto: SendSubmitMailDto): Promise<void> {
    const job = await this.queue.add('send-submit-mail', dto)
    console.log(`Job ${job.id} created.`)
  }
}

order-module files

// order.module.ts
@Module({
  imports: [
    QueueApiModule,
    ...
  ],
  providers: [
    OrderProcessor,
    ...
  ]
})
export class OrderModule {}

// order-processor.ts
@Processor('order-queue')
export class OrderProcessor {
  constructor(private readonly queue: OrderQueue) {}

  @Process('send-submit-mail')
  async onProcessSubmitMail(job: Job): Promise<void> {
    console.log(`Processing of job ${job.id}`)
  }
}

this processor handler is almost never called.

Do you have any idea what is wrong with my code? Thank you in advice.

2

Answers


  1. I’m getting similar problem, but haven’t drilled down to find the root cause yet. But for meantime I used bull-repl (npm bull-repl ) cli to see the queue status. When stalled error occurs, none job will be triggered after that (seems like queue is stuck on the failed job). If you run stats in bull-repl you’ll see that there’s a job in Active state. you can manually remove it (using bull-repl) and then you’ll get next job running. I suspect that QueueScheduler isn’t running therefore stalled jobs are not taken cared of. You can also increase the stalled timeouts params (there’re 2-3 of them, check [https://docs.bullmq.io/bull/important-notes]) to see if it helps.
    In my case, the lock happens when I pause in debug.

    Login or Signup to reply.
  2. A little late, it’s still better to have it written here

    It happens because of this line
    constructor(private readonly queue: OrderQueue) {}

    And more precisely it’s because the DI mechanism,
    Probably the reason the service is Scope.REQUEST
    (Or one of its injected services, which makes the host service also a Scope.REQUEST service, the whole injection sub-tree is request scoped)

    @Process() runs the handler in separate process, and therefore has no access to the Injector.

    If you look at the error that results from trying to process the job.data, yo’ll see something like this (In my case trying to inject EmailService):
    stacktrace ["TypeError: this.request.get is not a functionn at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)n at processTicksAndRejections (node:internal/process/task_queues:95:5)n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)n at async Promise.all (index 0)n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)","TypeError: this.request.get is not a functionn at new EmailService (/Users/stephan/projects/platform-api/src/messaging/email/service/email.service.ts:36:50)n at Injector.instantiateClass (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:340:19)n at callback (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:53:45)n at processTicksAndRejections (node:internal/process/task_queues:95:5)n at Injector.loadInstance (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:57:13)n at Injector.loadProvider (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:84:9)n at Injector.resolveComponentHost (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:202:13)n at async Promise.all (index 0)n at Injector.loadCtorMetadata (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:368:23)n at Injector.resolveConstructorParams (/Users/stephan/projects/platform-api/node_modules/@nestjs/core/injector/injector.js:99:26)"]

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search