skip to Main Content

I have a query like:

query = HistoryLogs.query()
query = query.filter(HistoryLogs.exec_id == exec_id)
iter = query.iter()

for ent in iter:
    # write log to file, nothing memory intensive

I added logs in the for loop and reading 10K rows increases memory usage by 200MB, then reading the next 10K rows adds extra 200MB and so on. Reading 100K requires 2GB, which exceeds the highmem memory limit.

I tried clearing the memcache in the for loop, after reading 10K rows, by adding:

                # clear ndb cache in order to reduce memory footprint
                context = ndb.get_context()
                context.clear_cache()

in the for loop, on each 10K-th iteration, but it resulted in the query being timed out, error BadRequestError: The requested query has expired. Please restart it with the last cursor to read more results. ndb was raised.

My initial expectation was that by using query.iter() instead of query.fetch() I wouldn’t face any memory issue and the memory would be pretty much constant, but that isn’t the case. Is there a way to read the data with iterator, without exceeding time nor memory limits? By clearing the context cache I see the memory consumption is pretty much constant, but I ran into troubles with the time it takes to retrieve all rows.

BTW, there are a lot of rows to be retrieved, up to 150K. Is it possible to get this done with some simple tweaks or I need a more complex solution, e.g. one which would use some parallelization?

2

Answers


  1. Are you running this in the remote-api-shell? Otherwise I’d imagine app engine’s max request timeout would start to be a problem.

    You should definitely run this in google dataflow instead. It will parallelize it for you / run faster.

    https://beam.apache.org/documentation/programming-guide/
    https://beam.apache.org/releases/pydoc/2.17.0/index.html
    https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/cookbook/datastore_wordcount.py

    I imagine your pipline code would look something like this:

    def run(project, gcs_output_prefix, exec_id):
    
        def format_result(element):
            csv_cells = [
                datastore_helper.get_value(element.properties['exec_id']),
                # extract other properties here!!!!!
            ]
            return ','.join([str(cell) for cell in csv_cells])
    
        pipeline_options = PipelineOptions([])
        pipeline_options.view_as(SetupOptions).save_main_session = True
    
        p = beam.Pipeline(options=pipeline_options)
    
        query = query_pb2.Query()
        query.kind.add().name = 'HistoryLogs'
    
        datastore_helper.set_property_filter(query.filter, 'exec_id', PropertyFilter.EQUAL, exec_id)
    
        _ = (p 
             | 'read' >> ReadFromDatastore(project, query, None)
             | 'format' >> beam.Map(format_result)
             | 'write' >> beam.io.WriteToText(file_path_prefix=gcs_output_prefix,
                                              file_name_suffix='.csv',
                                              num_shards=1) # limits output to a single file
        result = p.run()
        result.wait_until_finish()
    
    if __name__ == '__main__':
        logging.getLogger().setLevel(logging.INFO)
        run(project='YOUR-PROJECT-ID', 
            gcs_output_prefix='gs://YOUR-PROJECT-ID.appspot.com/history-logs-export/some-time-based-prefix/',
            exec_id='1234567890')
    

    This code reads from Google Datastore and exports to Google Cloud Storage as csv.

    Login or Signup to reply.
  2. A typical solution to prevent reaching both memory and request processing time limits when operating on large amounts of entities obtained from datastore queries is to split the workload in multiple manageable chunks using cursors and spread them across multiple requests (for example using push queue tasks), eventually staggered in time to also prevent instance explosion and contention in accessing the output media (if any).

    This way you can process virtually unlimited workloads, even if, for whatever reason, you cannot/will not use the nice dataflow solution suggested by Alex.

    You can find an example of the technique in How to delete all the entries from google datastore?

    But be mindful of the cursors limitations, see Are GAE Datastore cursors permanent and durable?

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search