skip to Main Content

I have (900k, 300) records on mongo collection.
When i am trying to read the data to pandas the memory consumption increase dramatically till the process is Killed.
I have to mention that the data is fit to memory(1.5GB~) if i am reading it from csv file.

My machine is 32GB RAM and 16 CPU’s Centos 7.

My simple code:

client = MongoClient(host,port)
collection = client[db_name][collection_name]
cursor = collection.find()
df = pd.DataFrame(list(cursor))

My multiprocessing code:

def read_mongo_parallel(skipses):


    print("Starting process")
    client = MongoClient(skipses[4],skipses[5])
    db = client[skipses[2]]
    collection = db[skipses[3]]
    print("range of {} to {}".format(skipses[0],skipses[0]+skipses[1]))

    cursor = collection.find().skip(skipses[0]).limit(skipses[1])

    return list(cursor)

all_lists = []
with concurrent.futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as executor:
        for  rows in  executor.map(read_mongo_parallel, skipesess):
            all_lists.extend(rows)


df = pd.DataFrame(all_lists)   

The memory increase in both methods and kill the kernel,

What i am doing worng?

6

Answers


  1. Chosen as BEST ANSWER

    I have found a solution with multiprocessing and its is the fastest

    def chunks(collection_size, n_cores=mp.cpu_count()):
        """ Return chunks of tuples """
    
    
        batch_size = round(collection_size/n_cores)
        rest = collection_size%batch_size 
        cumulative = 0
        for i in range(n_cores):
            cumulative += batch_size
            if i == n_cores-1:
                yield (batch_size*i,cumulative+rest)
            else:
               yield (batch_size*i,cumulative)
    
    
    def parallel_read(skipses,host=HOST, port=PORT):
    
    
        print('Starting process on range of {} to {}'.format(skipses[0],skipses[1]))
        client = MongoClient(host,port)
        db = client[DB_NAME]
        collection = db[COLLECTION_NAME]
    
        cursor = collection.find({},{ '_id': False } )
        _df = pd.DataFrame(list(cursor[skipses[0]:skipses[1]]))
        return _df
    
    
    
    def read_mongo(colc_size,_workers=mp.cpu_count()):
        temp_df = pd.DataFrame()
        pool = mp.Pool(processes=_workers)
        results = [pool.apply_async(parallel_read, args=(chunk,))  for chunk in chunks(colc_size,n_cores=_workers)]
        output = [p.get() for p in results]
        temp_df = pd.concat(output)
        return temp_df
    
    
    time_0 = time()
    df = read_mongo(get_collection_size())
    print("Reading database with  {} processes took {}".format(mp.cpu_count(),time()-time_0))
    
    
    
    Starting process on range of 0 to 53866
    Starting process on range of 323196 to 377062
    Starting process on range of 430928 to 484794
    Starting process on range of 538660 to 592526
    Starting process on range of 377062 to 430928
    Starting process on range of 700258 to 754124
    Starting process on range of 53866 to 107732
    Starting process on range of 484794 to 538660
    Starting process on range of 592526 to 646392
    Starting process on range of 646392 to 700258
    Starting process on range of 215464 to 269330
    Starting process on range of 754124 to 807990
    Starting process on range of 807990 to 915714
    Starting process on range of 107732 to 161598
    Starting process on range of 161598 to 215464
    Starting process on range of 269330 to 323196
    

    Reading database with 16 processes took 142.64860558509827

    With one of the examples above (no multiprocessing)

    def iterator2dataframes(iterator, chunk_size: int):
      """Turn an iterator into multiple small pandas.DataFrame
    
      This is a balance between memory and efficiency
      """
      records = []
      frames = []
      for i, record in enumerate(iterator):
        records.append(record)
        if i % chunk_size == chunk_size - 1:
          frames.append(pd.DataFrame(records))
          records = []
      if records:
        frames.append(pd.DataFrame(records))
      return pd.concat(frames)
    
    time_0 = time()
    cursor = collection.find()
    chunk_size = 1000
    df = iterator2dataframes(cursor, chunk_size)
    print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
    

    Reading database with chunksize = 10000 took 372.1170778274536

    time_0 = time()
    cursor = collection.find()
    chunk_size = 10000
    df = iterator2dataframes(cursor, chunk_size)
    print("Reading database with chunksize = {} took {}".format(chunk_size,time()-time_0))
    

    Reading database with chunksize = 10000 took 367.02637577056885


  2. This test harness creates 900k (albeit small) records and runs fine on my stock laptop. Give it a try.

    import pymongo
    import pandas as pd
    
    db = pymongo.MongoClient()['mydatabase']
    db.mycollection.drop()
    operations = []
    
    for i in range(900000):
        operations.append(pymongo.InsertOne({'a': i}))
    
    db.mycollection.bulk_write(operations, ordered=False)
    cursor = db.mycollection.find({})
    df = pd.DataFrame(list(cursor))
    print(df.count())
    
    Login or Signup to reply.
  3. The problem is in the list usage when you build the DataFrame.
    The cursor is consumed all at once, making a list with 900k dictionaries inside it, which takes a lot of memory.

    You can avoid that if you create an empty DataFrame and then pull the documents in batches, a few documents at a time, appending them to the DataFrame.

    def batched(cursor, batch_size):
        batch = []
        for doc in cursor:
            batch.append(doc)
            if batch and not len(batch) % batch_size:
                yield batch
                batch = []
    
        if batch:   # last documents
            yield batch
    
    df = pd.DataFrame()
    for batch in batched(cursor, 10000):
        df = df.append(batch, ignore_index=True)
    

    10000 seems like a reasonable batch size, but you may want to change it according to your memory constraints: the higher it is, the faster this will end, but also the more memory it will use while running.

    UPDATE: Add some benchmark

    Note that this approach does not necessary make the query last longer
    but rather the opposite, as what actually takes time is the process
    of pulling the documents out of mongodb as dictionaries and allocating
    them into a list.

    Here are some benchmarks with a 300K documents that show how this
    approach, with the right batch_size is actually even faster than pulling
    the whole cursor into a list:

    • The whole cursor into a list
    %%time
    
    df = pd.DataFrame(list(db.test.find().limit(300000)))
    

    CPU times: user 35.3 s, sys: 2.14 s, total: 37.5 s
    Wall time: 37.7 s

    • batch_size=10000 <- FASTEST
    %%time
    
    df = pd.DataFrame()
    for batch in batched(db.test.find().limit(300000), 10000):
        df = df.append(batch, ignore_index=True)
    

    CPU times: user 29.5 s, sys: 1.23 s, total: 30.7 s
    Wall time: 30.8 s

    • batch_size=1000
    %%time
    
    df = pd.DataFrame()
    for batch in batched(db.test.find().limit(300000), 1000):
        df = df.append(batch, ignore_index=True)
    

    CPU times: user 44.8 s, sys: 2.09 s, total: 46.9 s
    Wall time: 46.9 s

    • batch_size=100000
    %%time
    
    df = pd.DataFrame()
    for batch in batched(db.test.find().limit(300000), 100000):
        df = df.append(batch, ignore_index=True)
    

    CPU times: user 34.6 s, sys: 1.15 s, total: 35.8 s
    Wall time: 36 s

    Login or Signup to reply.
  4. You can try to get data from mongodb in chunk using slice index i.e. get 100000 documents at a time from mongodb. Add documents to dataframe and then fetch next 100000 documents and append the data to dataframe.

    client = MongoClient(host,port)
    collection = client[db_name][collection_name]
    maxrows=905679
            for i in range(0, maxrows, 100000):
                df2 = df2.iloc[0:0]
                if (i+100000<maxrows):
                    cursor = collection.find()[i:i+100000]
                else:
                    cursor = collection.find()[i:maxrows]
                df2= pd.DataFrame(list(cursor))
                df.append(df2, ignore_index=True)
    
    
    
    
    

    Refer below link to know more about slice index in mongodb.

    https://api.mongodb.com/python/current/api/pymongo/cursor.html

    Login or Signup to reply.
  5. Load the data in chunks.

    Using iterator2dataframes from https://stackoverflow.com/a/39446008/12015722

    def iterator2dataframes(iterator, chunk_size: int):
      """Turn an iterator into multiple small pandas.DataFrame
    
      This is a balance between memory and efficiency
      """
      records = []
      frames = []
      for i, record in enumerate(iterator):
        records.append(record)
        if i % chunk_size == chunk_size - 1:
          frames.append(pd.DataFrame(records))
          records = []
      if records:
        frames.append(pd.DataFrame(records))
      return pd.concat(frames)
    
    client = MongoClient(host,port)
    collection = client[db_name][collection_name]
    cursor = collection.find()
    
    df = iterator2dataframes(cursor, 1000)
    
    Login or Signup to reply.
  6. Just wanted to make y’all aware of pymongoarrow which is officially developed by MongoDB and solves this problem. It can output query results to arrow tables or pandas data frames and is – according to the docs – the preferred way of loading data from mongo into pandas. It sure worked like a charm for me!

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search