skip to Main Content

I have a column of long strings (like sentences) on which I want to do the following:

  1. replace certain characters
  2. create a list of the remaining strings
  3. if a string is all text see whether it is in a dictionary and if so keep it
  4. if a string is all numeric keep it
  5. if a string is a mix of numeric/text, find ratio of numbers to letters and keep if above a threshold

I currently do this as follows:

            for memo_field in self.memo_columns:
                data = data.with_columns(
                    pl.col(memo_field).map_elements(
                        lambda x: self.filter_field(text=x, word_dict=word_dict))
                    )

The filter_field method uses plain python, so:

  • text_sani = re.sub(r'[^a-zA-Z0-9s_-%]', ' ', text) to replace
  • text_sani = text_sani.split(' ') to split
  • len(re.findall(r'[A-Za-z]', x)) to find num letters for each element in text_sani list (similar for num digits) and ratio is difference divided by overall num characters
  • list comprehension and if to filter list of words

It actually isn’t too bad, 128M rows takes about 10 minutes. Unfortunately, future files will be much bigger. On a ~300M row file this approach gradually increases memory consumption until the OS (Ubuntu) kills the process. Also, all processing seems to take place on a single core.

I have started to try to use the Polars string expressions and code and a toy example are provided below.

At this point it looks like my only option is a function call to a do the rest. My questions are:

  1. in my original approach is it normal that memory consumption grows? Does map_elements create a copy of the original series and so consumes more memory?
  2. is my original approach correct or is there a better way eg. I have just started reading about struct in Polars?
  3. is it possible to do what I want using just Polars expressions?

UPDATE

The code example in answers from @Hericks and @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ were applied and largely addressed my third question. Implementing the Polars expressions greatly reduced run time with two observations:

  1. the complexity of the memo fields in my use-case greatly affect the run time. The key challenge is the look up of items in the dictionary; a large dictionary and many valid words in the memo field can severely affect run time; and
  2. I experienced many seg fault errors when saving in .parquet format when I used pl.DataFrame. When using pl.LazyFrame and sink_parquet there were no errors but run time was greatly extended (drives are NVME SSD at 2000MB/s)

EXAMPLE CODE/DATA:

Toy data:

temp = pl.DataFrame({"foo": ['COOOPS.autom.SAPF124',
                            'OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023',
                            'ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO',
                            'OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023']
                    })

Code Functions:

def num_dec(x):
    return len(re.findall(r'[0-9_/]', x))

def num_letter(x):
    return len(re.findall(r'[A-Za-z]', x))

def letter_dec_ratio(x):
    if len(x) == 0:
        return None
    nl = num_letter(x)
    nd = num_dec(x)
    if (nl + nd) == 0:       
        return None
    ratio = (nl - nd)/(nl + nd)
    return ratio

def filter_field(text=None, word_dict=None):

    if type(text) is not str or word_dict is None:
        return 'no memo and/or dictionary'

    if len(text) > 100:
        text = text[0:101]
    print("TEXT: ",text)
    text_sani = re.sub(r'[^a-zA-Z0-9s_-%]', ' ', text) # parse by replacing most artifacts and symbols with space 

    words = text_sani.split(' ') # create words separated by spaces
    print("WORDS: ",words)

    kept = []
    ratios = [letter_dec_ratio(w) for w in words]
    [kept.append(w.lower()) for i, w in enumerate(words) if ratios[i] is not None and ((ratios[i] == -1 or (-0.7 <= ratios[i] <= 0)) or (ratios[i] == 1 and w.lower() in word_dict))]
    print("FINAL: ",' '.join(kept))

    return ' '.join(kept)

Code Current Implementation:

temp.with_columns(
                pl.col("foo").map_elements(
                    lambda x: filter_field(text=x, word_dict=['cost','atm'])).alias('clean_foo') # baseline
                )

Code Partial Attempt w/Polars:

This gets me the correct WORDS (see next code block)

temp.with_columns(
    (
        pl.col(col)
        .str.replace_all(r'[^a-zA-Z0-9s_-%]',' ')
        .str.split(' ')
    )
)

Expected Result (at each step, see print statements above):

TEXT:  COOOPS.autom.SAPF124
WORDS:  ['COOOPS', 'autom', 'SAPF124']
FINAL:  
TEXT:  OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023
WORDS:  ['OSS', 'REEE', 'PAAA', 'comp', '', 'BEEE', '', 'atm', '6079', '19000000070', '04-04-2023']
FINAL:  atm 6079 19000000070 04-04-2023
TEXT:  ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO
WORDS:  ['ABCD', '600000000397', '7667896-6', 'REG', 'REF', 'REE', 'PREPREO', 'HMO']
FINAL:  600000000397 7667896-6
TEXT:  OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023
WORDS:  ['OSS', 'REFF', 'pagopago', 'cost', '', 'Becf', '', 'atm', '9682', '50012345726', '10-04-2023']
FINAL:  cost atm 9682 50012345726 10-04-2023

2

Answers


  1. The filtering can be implemented using polars’ native expression API as follows. I’ve taken the regular expressions from the naive implementation in the question.

    word_list = ["cost", "atm"]
    
    # to avoid long expressions in ``pl.Expr.list.eval``
    num_dec_expr = pl.element().str.count_matches(r'[0-9_/]').cast(pl.Int32)
    num_letter_expr = pl.element().str.count_matches(r'[A-Za-z]').cast(pl.Int32)
    ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)
    
    (
        df
        .with_columns(
            pl.col("foo")
            # convert to lowercase
            .str.to_lowercase()
            # replace special characters with space
            .str.replace_all(r"[^a-z0-9s_-%]", " ")
            # split string at spaces into list of words
            .str.split(" ")
            # filter list of words
            .list.eval(
                pl.element().filter(
                    # only keep non-empty string...
                    pl.element().str.len_chars() > 0,
                    # ...that either 
                    # - are in the list of words,
                    # - consist only of characters related to numbers,
                    # - have a ratio between -0.7 and 0
                    pl.element().is_in(word_list) | num_letter_expr.eq(0) | ratio_expr.is_between(-0.7, 0)
                )
            )
            # join list of words into string
            .list.join(" ")
            .alias("foo_clean")
        )
    )
    
    shape: (4, 2)
    ┌───────────────────────────────────────────────────────────────┬──────────────────────────────────────┐
    │ foo                                                           ┆ foo_clean                            │
    │ ---                                                           ┆ ---                                  │
    │ str                                                           ┆ str                                  │
    ╞═══════════════════════════════════════════════════════════════╪══════════════════════════════════════╡
    │ COOOPS.autom.SAPF124                                          ┆                                      │
    │ OSS REEE PAAA comp. BEEE  atm 6079 19000000070 04-04-2023     ┆ atm 6079 19000000070 04-04-2023      │
    │ ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO           ┆ 600000000397 7667896-6               │
    │ OSS REFF pagopago cost. Becf  atm 9682 50012345726 10-04-2023 ┆ cost atm 9682 50012345726 10-04-2023 │
    └───────────────────────────────────────────────────────────────┴──────────────────────────────────────┘
    
    Login or Signup to reply.
  2. Let’s see if I can help with the performance concerns. There may be a way to get your algorithm to run in a performance envelope that is acceptable for your system.

    For the benchmarking below, I created the following dataset of over a billion records, along with some extra columns (to simulate data other than the string column that is being processed).

    shape: (1_073_741_824, 6)
    ┌───────────────────────────────────┬────────┬───────┬─────┬─────────────────────┬─────────────────────┐
    │ foo                               ┆ string ┆ float ┆ int ┆ datetime            ┆ date                │
    │ ---                               ┆ ---    ┆ ---   ┆ --- ┆ ---                 ┆ ---                 │
    │ str                               ┆ str    ┆ f32   ┆ i32 ┆ datetime[μs]        ┆ datetime[μs]        │
    ╞═══════════════════════════════════╪════════╪═══════╪═════╪═════════════════════╪═════════════════════╡
    │ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ …                                 ┆ …      ┆ …     ┆ …   ┆ …                   ┆ …                   │
    │ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ COOOPS.autom.SAPF124              ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ OSS REEE PAAA comp. BEEE  atm 60… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ ABCD 600000000397/7667896-6/REG.… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    │ OSS REFF pagopago cost. Becf  at… ┆ string ┆ 1.0   ┆ 1   ┆ 2024-03-01 00:00:00 ┆ 2024-03-01 00:00:00 │
    └───────────────────────────────────┴────────┴───────┴─────┴─────────────────────┴─────────────────────┘
    

    To simulate the situation you describe, I saved the dataframe to a parquet file with default compression. This parquet file is then used as the input below (input_filepath in the code below).

    For reference, the above dataset consumes 77 GB of RAM when fully loaded in RAM (per Polars estimated_size method). I’m running these benchmarks on a 32-core Threadripper Pro with 504 GB of available RAM.

    Using collect

    Using the normal collect method with default options, I ran the following code copied from @Hericks excellent answer above:

    from time import perf_counter
    
    
    start = perf_counter()
    word_list = ["cost", "atm"]
    
    num_dec_expr = pl.element().str.count_matches(r"[0-9_/]").cast(pl.Int32)
    num_letter_expr = pl.element().str.count_matches(r"[A-Za-z]").cast(pl.Int32)
    ratio_expr = (num_letter_expr - num_dec_expr) / (num_letter_expr + num_dec_expr)
    
    (
        pl.scan_parquet(input_filepath)
        .with_columns(
            pl.col("foo")
            .str.to_lowercase()
            .str.replace_all(r"[^a-z0-9s_-%]", " ")
            .str.split(" ")
            .list.eval(
                pl.element().filter(
                    pl.element().str.len_chars() > 0,
                    pl.element().is_in(word_list)
                    | num_letter_expr.eq(0)
                    | ratio_expr.is_between(-0.7, 0),
                )
            )
            .list.join(" ")
            .alias("foo_clean")
        )
        .collect()
    )
    
    print("Elapsed time: ", perf_counter() - start)
    

    The runtime performance was as you describe. The algorithm quickly reverts to single-threaded behavior, and allocates an inordinate amount of RAM (250 GB of RAM) per the top command. I killed the process after waiting for over 15 minutes.

    Using collect(streaming=True, comm_subplan_elim=False)

    By substituting the collect statement with collect(streaming=True, comm_subplan_elim=False), the above completes in a mere 167 seconds. And top shows that the algorithm pushes all 64 logical cores of my Threadripper Pro system to 100%.

    However, the algorithm does continue to consume a large amount of RAM: 160 GB while running, just eyeballing the top command. Still, that certainly is better than the 250 GB of RAM when using collect with default options.

    using sink_parquet

    By substituting the sink_parquet method instead of the collect method and thus saving the results directly to disk, the algorithm ran in 459 seconds (about 7.5 minutes). And the RAM topped out at a mere 8GB while processing the file.

    One thing I’d caution, the runtime behavior showed that it used all 64 logical cores of my system, but not to 100%. This could be due to an I/O bottleneck, but I doubt it. (My system has four Gen4 1TB NVME sticks in RAID 0 as a working project storage.) As such, I would suggest that using sink_parquet could take longer on a system where I/O bottlenecks are more pronounced.

    Admittedly, streaming the results directly to disk may not be want you want. But it may be what you need to get past this step of your processing with acceptable run-time and memory footprints.

    If any of this helps, well and good. And please continue to give credit for the accepted answer to @Hericks. His answer was spot-on regarding how to use Polars’ native string-manipulation capabilities.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search