skip to Main Content

I am using Nodejs with mongodb

I have this api which is responsible for dumping data into Reporting collection

Now payload data comes in bulk as array

This data is coming from
Different sensors that is installed on different stores

Currently I have 5 Million records in Reporting collection

The api should dump this data in Reporting collection in fastest way possible, but before that there should be no duplication in the database based on

tenantId, typeId, reportingType, storeCode, hour, startTimestamp, endTimestamp

it should always consider and keep the latest records, the old records should be either deleted or updated with new one

Currently I am performing Upsert

Now this logic is working perfectly fine.

But the main issue here is that this operation takes so much time and CPU utilization goes to 97% for mongodb

Here is my code:

importReportingData: async function (params) {

    try {
        
        if (!params.payload || !params.payload.length) return {statusCode: 3, message: 'payload is missing.', response: {params: params});

        const bulkOps = [];

        if (params.payload && params.payload.length) {
            
            for (var i = 0; i < params.payload.length; i++) {
                
                const data = params.payload[i];
                const reportingType = data.reportingType;

                const query = {
                    tenantId: data.tenantId,
                    reportingId: data.reportingId,
                    reportingType: data.reportingType,
                    storeCode: data.storeCode,
                    hour: data.hour,
                    startTimestamp: { $gte: new Date(data.startTimestamp) },
                    endTimestamp: { $lte: new Date(data.endTimestamp) }
                };

                const update = {
                    $set: {
                        tenantId: data.tenantId,
                        reportingType: data.reportingType,
                        storeCode: data.storeCode,
                        reportingId: data.reportingId,
                        hour: data.hour,
                        month: data.month,
                        year: data.year,
                        dayName: data.dayName,
                        week: data.week,
                        startTimestamp: new Date(data.startTimestamp),
                        endTimestamp: new Date(data.endTimestamp),
                        traffic: data.countValue,
                        createdAt: new Date()
                    }
                };

                const bulkOp = {
                    updateOne: {
                        filter: query,
                        update: update,
                        upsert: true
                    }
                };

                bulkOps.push(bulkOp);
            }
        }

        console.log("Bulk Write Initiated....!");
        await Reporting.rawCollection().bulkWrite(bulkOps);
        console.log("---> Bulk Write Done <---");

        return {statusCode: 200, message: 'Success', response: {});
    } 
    catch (err) {
        
        return {statusCode: 400, message: err.message, response: {});
    }
}

Now is there any way to optimize this?

Edit

I have indexes created on following:

tenantId, typeId, reportingType, storeCode, hour, startTimestamp, endTimestamp

startTimestamp and endTimestamp is actually the complete timestamp for hour start and end

E.g

startTimestamp: 2023-06-30 13:00:00

endTimestamp: 2023-06-30 13:59:59

Currently I am sending a payload size of 500 objects after 5minutes. Even when I send 50 objects per payload, it takes time as soon as dataset grows. Of course not much as 500 objects payload but still takes lot of time and with in that period CPU is on 100+ percent usage

2

Answers


  1. looking at your code, i don’t know the payload size, i would break it into small parts, for example 100 by 100
    other think create index with your query attributes
    your code in batch of 100 it would be something like this(i don’t execute)

    importReportingData: async function (params) {
      try {
        if (!params.payload || !params.payload.length) {
          return {
            statusCode: 3,
            message: 'payload is missing.',
            response: { params: params },
          };
        }
    
        const batchSize = 100; // Number of items to process per batch
        const payload = params.payload;
    
        const totalBatches = Math.ceil(payload.length / batchSize);
    
        for (let batchIndex = 0; batchIndex < totalBatches; batchIndex++) {
          const startIndex = batchIndex * batchSize;
          const endIndex = startIndex + batchSize;
          const currentBatch = payload.slice(startIndex, endIndex);
    
          const bulkOps = [];
    
          for (let i = 0; i < currentBatch.length; i++) {
            const data = currentBatch[i];
            const reportingType = data.reportingType;
    
            // Your existing code for generating the bulk operation
    
            bulkOps.push(bulkOp);
          }
    
          console.log(`Bulk Write Initiated for Batch ${batchIndex + 1}....!`);
          await Reporting.rawCollection().bulkWrite(bulkOps);
          console.log(`---> Bulk Write Done for Batch ${batchIndex + 1} <---`);
        }
    
        return { statusCode: 200, message: 'Success', response: {} };
      } catch (err) {
        return { statusCode: 400, message: err.message, response: {} };
      }
    }
    
    Login or Signup to reply.
  2. But the main issue here is that this operation takes so much time and CPU utilization goes to 97% for mongodb

    Now is there any way to optimize this?

    Overall you haven’t definitively provided enough information for us to confidently determine the root of your problem. But per the edit and the comments we can take an educated guess at the problem and its associated resolution. In addition to the elevated CPU utilization, we have two other clues. The first is the query portion of the update itself, namely:

    const query = {
      tenantId: data.tenantId,
      reportingId: data.reportingId,
      reportingType: data.reportingType,
      storeCode: data.storeCode,
      hour: data.hour,
      startTimestamp: { $gte: new Date(data.startTimestamp) },
      endTimestamp: { $lte: new Date(data.endTimestamp) }
    };
    

    And then the indexes present on the collection:

    I have indexes created on following:

    tenantId, typeId, reportingType, storeCode, hour, startTimestamp, endTimestamp

    It sounds like these are all single field indexes. In that situation then, for each of them, the database can consider a possible plan that resembles the following:

    • Scan the index for the matching values of that single field. For example to find all documents with the appropriate hour.
    • For all of those matches, retrieve the full document and apply all of the other query conditions (tenantId, reportingType, startTimestamp) to filter out documents that don’t match.

    What is probably happening here is that it is the combination of all of the query predicates that are selective as opposed to a single one. This effectively leaves the database with no good query plans, as every option needs to scan many (potentially orders of magnitude) more documents than those that are actually targeted for modification. This is very wasteful and, assuming that much of the data set is in memory, very CPU intensive.

    The solution here is to provide an index that the database can use to create an efficient ("good") execution plan. Creating such an index will look similar to the following command in a shell session:

    db.getSiblingDB("<DBNAME>").Reporting.createIndex({ 
      tenantId: 1, 
      reportingId: 1,  
      reportingType: 1, 
      storeCode: 1, 
      hour: 1, 
      startTimestamp: 1, 
      endTimestamp: 1 
    });
    

    You should replace <DBNAME> and Reporting as needed to match your environment. This index will allow the database to almost perfectly retrieve index keys and documents associated with documents that are targeted by the operation. That approach will be much more efficient and will likely reduce CPU utilization (and duration) of the operation significantly (dependent on the selectivity of the query on this collection).

    Further troubleshooting would require the output of .explain("allPlansExecution") for the operation.

    So technically the other indexes I created on same columns individually wont effect alongside with compound index

    If I understand your question in the comments correctly, then the answer is "yes". For the most part, databases typically only use a single index per query/operation. The aforementioned compound index will almost certainly be used by this operation, and it makes the single field index on { tenantId: 1 } likely redundant and safe to remove. We cannot say anything about whether or not the other indexes can be removed as information about the other queries in your workload is not available. You can find some more general information on the topic here in the documentation.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search