skip to Main Content

Background :

I have created a job which is reading data from MongoDB and loading that to MS-SQL.

Current Behaviour :

Whenever I run the job it is fetching all the data from MongoDB .

Expected Behaviour :

When the job will run it should fetch only the data which is not loaded yet. I have a timestamp field in mongoDB document.

Example

Timestamp :2022-07-29T08:14:14.657+00:00

Solution 1:

I have tried to add in the query to mongo to load only last 15 mints.

But the problem is , for example my job component remains down for 1 hour.

When it come up again , on next job run it will load only last 15 mints data and we lost the 45 mints data..

Required Solution :

If the job run first time then it will extract data of all time and load to SQL.

when the job run next time (let say after 15 mints) then it will automatically assume that these are newly created and will load only new rows.

Update

Now I have write a complete article on this solution.
https://medium.com/@raowaqasakram/fetch-latest-data-from-mongodb-talend-1f21ba7b98b5

2

Answers


  1. You can consider adding a new property to MongoDB collection model. For example you can add new property called viewed that will be false by default.

    Then, you can always query all document that have viewed property set to false, and immediately update the property of these documents to true. That way, they will not be fetched in the next call.

    You can do it with update() method like this:

    db.collection.update({
      "viewed": false
    },
    {
      "$set": {
        "viewed": true
      }
    })
    

    With this, it is not important when you will run next query, since the query will always return all the documents that are still not viewed.

    Login or Signup to reply.
  2. The most sound solution will be to just get the last timestamp exported ( assuming this field is only increasing ) directly from the SQL table:

    // get the last timestamp from SQL
    const lastTimestamp = SQLClient("select timestamp from sqltable order by timestamp desc limit 1");
    
    const documentsToExport = db.collection.find({ timestamp: { $gt: new Date(lastTimestamp) }});
    
    ... export logic ...
    

    This way even if the current job fails in the middle you always get the last document uploaded, You should also make sure you insert documents in order to support this.


    You can also maintain some metadata collection that contains the job details, for example (in nodejs syntax):

    // get the last job saved.
    const lastJob = await db.jobCollection.findOne({}, { sort: { _id: -1 });
    
    // if this is the first job timestamp will be 1970 otherwise use previous job timestamp.
    const nextTimestamp = lastJob?.timestamp ?? new Date('1970');
    
    const documentsToExport = await db.collection.find({ timestamp: {$gt: nextTimestamp }}).sort({timestamp: 1});
    
    if (documentsToExport.length) {
        ... upload to sql ...
    
        // insert the latest timestamp from mongo, you can add additional metadata fields here.
       // like run date, time took, documents inserted etc.
        await.jobCollection.insertOne({ 
         timestamp: documentsToExport[documentsToExport.length - 1].timestamp,
         documentsInserted: documentsToExport.length,
         createdAt: new Date()
     })
    }
    

    Obviously this process is less fault tolerant, which is why I recommend you use the first one if possible.


    The last solution is what was offered in a different answer which is to add a field on each document that signals wether or not it has been uploaded, but for larger collection it will required an additional index and schema change which is something I prefer to avoid if possible which I feel is the case here.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search