skip to Main Content

Dependency DAG

enter image description here

Description

Pretty straight forward, basically, I am reading some parquet files from disk using polars which are the source of data. Doing some moderately heavy duty processing (a few million rows) to generate an intermediate data frame, then generating two results which need to be written back to some database

Technology Stack
  • Ubuntu 22.04
  • Python 3.10
  • Polars 1.2.1
Question

Polars recommends using lazy evaluations as far as possible to optimise the execution. Now, the final results (result_1 and result_2) obviously have to be materialised.

But if I call these two in sequence

#! /usr/bin/env python3
# encoding: utf-8
import polars as pl
...
result_1.collect() # Materialise result 1
result_2.collect() # Materialise result 2

Is the transformation from the source to intermediate frame (common ancestor) repeated? If so, it is clearly undesirable. In that case, I have to materialise the intermediate frame and then do the rest of the processing in eager mode.

Any documentation from polars on the expected behaviour and recommended practices around this scenario?

2

Answers


  1. Try pl.collect_all to collect multiple dataframes.

    pl.collect_all([result1, result2])
    

    Reference: https://docs.pola.rs/api/python/stable/reference/api/polars.collect_all.html

    Login or Signup to reply.
  2. Honestly, I think for production code your best bet is to collect() intermediate results and then reuse them in result_1 and result_2. It would be nice it collect_all() could find some common subgraphs of the calculation and cached them, but I don’t think it’s happening (although I haven’t really checked rust code).

    You could probably try some workaround via polars.concat():

    # let's say you have some intermediate LazyFrame with some calculations
    lf_intermediate = lf.group_by("a").agg()
    
    # and here you want to create 2 different results out of this DataFrame
    # You can add a 'partitioning' column so you can separate your results after
    # collection
    lf1 = lf_intermediate.with_columns(pl.col.a * 2, partition=pl.lit(1))
    lf2 = lf_intermediate.with_columns(pl.col.a / 3, partition=pl.lit(2))
    
    # create combined result
    df_result = pl.concat([lf1, lf2], how='diagonal').collect()
    
    # and now separate results into different dataframes
    df1 = lf_result.filter(pl.col.partition == 1)
    df2 = lf_result.filter(pl.col.partition == 2)
    

    You can see that intermediate part is cached during calculation:

    pl.concat([lf1, lf2], how='diagonal').explain(optimized=True)
    
    UNION
      PLAN 0:
         WITH_COLUMNS:
         [[(col("a")) * (2)], dyn int: 1.alias("partition"), null.cast(Float64).alias("c")] 
          CACHE[id: 0, cache_hits: 1]
            AGGREGATE
                [] BY [col("a")] FROM
              DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
      PLAN 1:
        simple π 3/3 ["a", "partition", "c"]
           WITH_COLUMNS:
           [[(col("a")) / (3)].alias("c"), dyn int: 2.alias("partition")] 
            CACHE[id: 0, cache_hits: 1]
              AGGREGATE
                [] BY [col("a")] FROM
                DF ["a"]; PROJECT 1/1 COLUMNS; SELECTION: None
    END UNION
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search