skip to Main Content

I have an application that makes API calls to another system, and it queues these API calls in a queue using Bull and Redis.

However, occasionally it gets bogged down with lots of API calls, or something stops working properly, and I want an easy way for users to check if the system is just "busy". (Otherwise, if they perform some action, and 10 minutes later it hasn’t completed, they’ll keep trying it again, and then we get a backlog of more entries (and in some cases data issues where they’ve issued duplicate parts, etc.)

Here’s what a single "key" looks like for a successful API call in the queue:

HSET "bull:webApi:4822" "timestamp" "1639085540683"
HSET "bull:webApi:4822" "returnvalue" "{"id":"e1df8bb4-fb6c-41ad-ba62-774fe64b7882","workOrderNumber":"WO309967","status":"success"}"
HSET "bull:webApi:4822" "processedOn" "1639085623027"
HSET "bull:webApi:4822" "data" "{"id":"e1df8bb4-fb6c-41ad-ba62-774fe64b7882","token":"eyJ0eXAiOiJKV1QiL....dQVyEpXt64Fznudfg","workOrder":{"members":{"lShopFloorLoad":true,"origStartDate":"2021-12-09T00:00:00","origRequiredQty":2,"requiredQty":2,"requiredDate":"2021-12-09T00:00:00","origRequiredDate":"2021-12-09T00:00:00","statusCode":"Released","imaItemName":"Solid Pin - Black","startDate":"2021-12-09T00:00:00","reference":"HS790022053","itemId":"13840402"}},"socketId":"3b9gejTZjAXsnEITAAvB","type":"Create WO"}"
HSET "bull:webApi:4822" "delay" "0"
HSET "bull:webApi:4822" "priority" "0"
HSET "bull:webApi:4822" "name" "__default__"
HSET "bull:webApi:4822" "opts" "{"lifo":true,"attempts":1,"delay":0,"timestamp":1639085540683}"
HSET "bull:webApi:4822" "finishedOn" "1639085623934"

You can see in this case it took 83 seconds to process. (1639085540 – 1639085623)

I’d like to be able to provide summary metrics like:

  • Most recent API call was added to queue X seconds ago
  • Most recent successful API call completed X seconds ago and took XX seconds to
    complete.

I’d also like to be able to provide a list of the 50 most recent API calls, formatted in a nice way and tagged with "success", "pending", or "failed".

I’m fairly new to Redis and Bull, and I’m trying to figure out how to query this data (using Redis in Node.js) and return this data as JSON to the application.

I can pull a list of keys like this:

// @route   GET /status
async function status(req, res) {
  const client = createClient({
    url: `redis://${REDIS_SERVER}:6379`
  });
  try {
      client.on('error', (err) => console.log('Redis Client Error', err));
      await client.connect();
      const value = await client.keys('*');
      res.json(value)

  } catch (error) {
      console.log('ERROR getting status: ', error.message, new Date())
      res.status(500).json({ message: error.message })
  } finally {
    client.quit()
  }
}

Which will return ["bull:webApi:3","bull:webApi:1","bull:webApi:2"…]

But how can I pull the values associated to the respective keys?

And how can I find the key with the highest number, and then pull the details for the "last 50". In SQL, it would be like doing a ORDER BY key_number DESC LIMIT 50 – but I’m not sure how to do it in Redis.

2

Answers


  1. Chosen as BEST ANSWER

    So I've figured out how to pull the data I need. I'm not saying it's a good method, and I'm open to suggestions; but it seems to work to provide a filtered JSON return with the needed data, without changing how the queue functions.

    Here's what it looks like:

    // @route   GET /status/:listNum
    async function status(req, res) {
      const { listNum = 10} = req.params
      const client = createClient({
        url: `redis://${REDIS_SERVER}:6379`
      });
      try {
          client.on('error', (err) => console.log('Redis Client Error', err));
          await client.connect();
          // Find size of queue database
          const total_keys = await client.sendCommand(['DBSIZE']);
          const upper_value = total_keys;
          const lower_value = total_keys - listNum;
          // Generate array
          const range = (start, stop) => Array.from({ length: (start - stop) + 1}, (_, i) => start - (i));
          var queue_ids = range(upper_value, lower_value)
          queue_ids = queue_ids.filter(function(x){ return x > 0 }); // Filer out anything less than zero
          // Current timestamp in seconds
          const current_timestamp = parseInt(String(new Date().getTime()).slice(0, -3)); // remove microseconds ("now")
          var response = []; // Initialize array
          for(id of queue_ids){ // Loop through queries
            // Query value
            var value = await client.HGETALL('bull:webApi:'+id);
            if(Object.keys(value).length !== 0){ // if returned a value
              // Grab most of the request (exclude the token & socketId to save space, not used)
              var request_data = JSON.parse(value.data)
              request_data.token = '';
              request_data.socketId = '';
              // Grab & calculate desired times
              const processedOn = value.processedOn.slice(0, -3); // remove microseconds ("start")
              const finishedOn = value.finishedOn.slice(0, -3); // remove microseconds ("done")
              const duration = finishedOn - processedOn; // (seconds)
              const elapsedSinceStart = current_timestamp - processedOn;
              const elapsedSinceFinished = current_timestamp - finishedOn;
              // Grab the returnValue
              const return_data = value.returnValue;
              // ignoring queue keys of: opts, priority, delay, name, timestamp
              const object_data = {request_data: request_data, processedOn: processedOn, finishedOn: finishedOn, return_data: return_data, duration: duration, elapsedSinceStart: elapsedSinceStart, elapsedSinceFinished: elapsedSinceFinished }
              response.push(object_data);
            }
          }
          
          res.json(response);
    
      } catch (error) {
          console.log('ERROR getting status: ', error.message, new Date());
          res.status(500).json({ message: error.message });
      } finally {
        client.quit();
      }
    }
    

    It's looping the Redis query, so I wouldn't want to use this for hundreds of keys, but for 10 or even 50 I'm thinking it should work.

    For now I've resorted to getting the total number of keys and working backwards:

    await client.sendCommand(['DBSIZE']);
    

    In my case it will return a total number slightly higher than the highest key id (~ a handful of status keys), but at least gets close, and then I just filter out any non-responses.

    I've looked at ZRANGE a bit, but I can't figure out how to get it to give me the last id. When I have a Redis database (Bull Queue) like this:

    redis

    If there's a simple Redis command I can run that will return "3", I'd probably use that instead. (since bull:webApi:3 has the highest number)

    (In actual use case, this might be 9555 or some high number; I just want to get the highest numbered key that exists.)

    For now I'll try using the method I've come up with above.


  2. I’m a bit late here, but if you’re not set on manually digging around in Redis, I think Bull’s API, in particular Queue#getJobs(), has everything you need here, and should be much easier to work with. Generally, you shouldn’t have to reach into Redis to do any common tasks like this, that’s what Bull is for!

    If I understand you goal correctly, you should be able to do something like:

    const Queue = require('bull')
    
    async function status (req, res) {
      const { listNum = 10 } = req.params
      const api_queue = new Queue('webApi', `redis://${REDIS_SERVER}:6379`)
      const current_timestamp_sec = new Date().getTime() / 1000 // convert to seconds
    
      const recent_jobs = await api_queue.getJobs(null, 0, listNum)
      const results = recent_jobs.map(job => {
        const processed_on_sec = job.processedOn / 1000
        const finished_on_sec = job.finishedOn / 1000
        return {
          request_data: job.data,
          return_data: job.returnvalue,
          processedOn: processed_on_sec,
          finishedOn: finished_on_sec,
          duration: finished_on_sec - processed_on_sec,
          elapsedSinceStart: current_timestamp_sec - processed_on_sec,
          elapsedSinceFinished: current_timestamp_sec - finished_on_sec
        }
      })
      res.json(results)
    }
    

    That will get you the most recent numList* jobs in your queue. I haven’t tested this full code, and I’ll leave the error handling and adding of your custom fields to the job data up to you, but the core of it is solid and I think that should meet your needs without ever having to think about how Bull stores things in Redis.

    I also included a suggestion on how to deal with timestamps a bit more nicely, you don’t need to do string processing to convert milliseconds to seconds. If you need them to be integers you can wrap them in Math.floor().

    * at least that many, anyway – see the second note below


    A couple notes:

    • The first argument of getJobs() is a list of statuses, so if you want to look at just completed jobs, you can pass ['completed'], or completed and active, do ['completed', 'active'], etc. If no list is provided (null) it defaults to all statuses.
    • As mentioned in the reference I linked, the limit here is per state – so you’ll likely get more than listNum jobs back. It doesn’t seem like that should be a problem for your use case, but if it is, you can sort the list returned (probably by job id) and just return the first listNum – you’re guaranteed to get at least that many (assuming there are that many jobs in your queue), and won’t get more than 6*listNum (since there are 6 states).
    • Folks new to Bull can get nervous about instantiating a Queue object to do stuff like this – but don’t be! By itself a Queue instance doesn’t do anything, it’s just an interface to the given queue. It won’t start processing jobs until you call process() to add a processor. This is, incidentally, also how you’d add jobs from a separate process than you run your queues in, but of course nothing will be added unless you call add().
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search