I have a column of long strings (like sentences) on which I want to do the following:
- replace certain characters
- create a list of the remaining strings
- if a string is all text see whether it is in a dictionary and if so keep it
- if a string is all numeric keep it
- if a string is a mix of numeric/text, find ratio of numbers to letters and keep if above a threshold
I currently do this as follows:
for memo_field in self.memo_columns:
data = data.with_columns(
pl.col(memo_field).map_elements(
lambda x: self.filter_field(text=x, word_dict=word_dict))
)
The filter_field method uses plain python, so:
text_sani = re.sub(r'[^a-zA-Z0-9s_-%]', ' ', text)
to replacetext_sani = text_sani.split(' ')
to splitlen(re.findall(r'[A-Za-z]', x))
to find num letters for each element in text_sani list (similar for num digits) and ratio is difference divided by overall num characters- list comprehension and
if
to filter list of words
It actually isn’t too bad, 128M rows takes about 10 minutes. Unfortunately, future files will be much bigger. On a ~300M row file this approach gradually increases memory consumption until the OS (Ubuntu) kills the process. Also, all processing seems to take place on a single core.
I have started to try to use the Polars string expressions and code and a toy example are provided below.
At this point it looks like my only option is a function call to a do the rest. My questions are:
- in my original approach is it normal that memory consumption grows? Does
map_elements
create a copy of the original series and so consumes more memory? - is my original approach correct or is there a better way eg. I have just started reading about
struct
in Polars? - is it possible to do what I want using just Polars expressions?
UPDATE
The code example in answers from @Hericks and @ΩΠΟΚΕΚΡΥΜΜΕΝΟΣ were applied and largely addressed my third question. Implementing the Polars expressions greatly reduced run time with two observations:
- the complexity of the
memo
fields in my use-case greatly affect the run time. The key challenge is the look up of items in the dictionary; a large dictionary and many valid words in thememo
field can severely affect run time; and - I experienced many seg fault errors when saving in
.parquet
format when I usedpl.DataFrame
. When usingpl.LazyFrame
andsink_parquet
there were no errors but run time was greatly extended (drives are NVME SSD at 2000MB/s)
EXAMPLE CODE/DATA:
Toy data:
temp = pl.DataFrame({"foo": ['COOOPS.autom.SAPF124',
'OSS REEE PAAA comp. BEEE atm 6079 19000000070 04-04-2023',
'ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO',
'OSS REFF pagopago cost. Becf atm 9682 50012345726 10-04-2023']
})
Code Functions:
def num_dec(x):
return len(re.findall(r'[0-9_/]', x))
def num_letter(x):
return len(re.findall(r'[A-Za-z]', x))
def letter_dec_ratio(x):
if len(x) == 0:
return None
nl = num_letter(x)
nd = num_dec(x)
if (nl + nd) == 0:
return None
ratio = (nl - nd)/(nl + nd)
return ratio
def filter_field(text=None, word_dict=None):
if type(text) is not str or word_dict is None:
return 'no memo and/or dictionary'
if len(text) > 100:
text = text[0:101]
print("TEXT: ",text)
text_sani = re.sub(r'[^a-zA-Z0-9s_-%]', ' ', text) # parse by replacing most artifacts and symbols with space
words = text_sani.split(' ') # create words separated by spaces
print("WORDS: ",words)
kept = []
ratios = [letter_dec_ratio(w) for w in words]
[kept.append(w.lower()) for i, w in enumerate(words) if ratios[i] is not None and ((ratios[i] == -1 or (-0.7 <= ratios[i] <= 0)) or (ratios[i] == 1 and w.lower() in word_dict))]
print("FINAL: ",' '.join(kept))
return ' '.join(kept)
Code Current Implementation:
temp.with_columns(
pl.col("foo").map_elements(
lambda x: filter_field(text=x, word_dict=['cost','atm'])).alias('clean_foo') # baseline
)
Code Partial Attempt w/Polars:
This gets me the correct WORDS
(see next code block)
temp.with_columns(
(
pl.col(col)
.str.replace_all(r'[^a-zA-Z0-9s_-%]',' ')
.str.split(' ')
)
)
Expected Result
(at each step, see print
statements above):
TEXT: COOOPS.autom.SAPF124
WORDS: ['COOOPS', 'autom', 'SAPF124']
FINAL:
TEXT: OSS REEE PAAA comp. BEEE atm 6079 19000000070 04-04-2023
WORDS: ['OSS', 'REEE', 'PAAA', 'comp', '', 'BEEE', '', 'atm', '6079', '19000000070', '04-04-2023']
FINAL: atm 6079 19000000070 04-04-2023
TEXT: ABCD 600000000397/7667896-6/REG.REF.REE PREPREO/HMO
WORDS: ['ABCD', '600000000397', '7667896-6', 'REG', 'REF', 'REE', 'PREPREO', 'HMO']
FINAL: 600000000397 7667896-6
TEXT: OSS REFF pagopago cost. Becf atm 9682 50012345726 10-04-2023
WORDS: ['OSS', 'REFF', 'pagopago', 'cost', '', 'Becf', '', 'atm', '9682', '50012345726', '10-04-2023']
FINAL: cost atm 9682 50012345726 10-04-2023
2
Answers
The filtering can be implemented using polars’ native expression API as follows. I’ve taken the regular expressions from the naive implementation in the question.
Let’s see if I can help with the performance concerns. There may be a way to get your algorithm to run in a performance envelope that is acceptable for your system.
For the benchmarking below, I created the following dataset of over a billion records, along with some extra columns (to simulate data other than the string column that is being processed).
To simulate the situation you describe, I saved the dataframe to a parquet file with default compression. This parquet file is then used as the input below (
input_filepath
in the code below).For reference, the above dataset consumes 77 GB of RAM when fully loaded in RAM (per Polars
estimated_size
method). I’m running these benchmarks on a 32-core Threadripper Pro with 504 GB of available RAM.Using
collect
Using the normal
collect
method with default options, I ran the following code copied from @Hericks excellent answer above:The runtime performance was as you describe. The algorithm quickly reverts to single-threaded behavior, and allocates an inordinate amount of RAM (250 GB of RAM) per the
top
command. I killed the process after waiting for over 15 minutes.Using
collect(streaming=True, comm_subplan_elim=False)
By substituting the collect statement with
collect(streaming=True, comm_subplan_elim=False)
, the above completes in a mere 167 seconds. Andtop
shows that the algorithm pushes all 64 logical cores of my Threadripper Pro system to 100%.However, the algorithm does continue to consume a large amount of RAM: 160 GB while running, just eyeballing the
top
command. Still, that certainly is better than the 250 GB of RAM when usingcollect
with default options.using
sink_parquet
By substituting the
sink_parquet
method instead of thecollect
method and thus saving the results directly to disk, the algorithm ran in 459 seconds (about 7.5 minutes). And the RAM topped out at a mere 8GB while processing the file.One thing I’d caution, the runtime behavior showed that it used all 64 logical cores of my system, but not to 100%. This could be due to an I/O bottleneck, but I doubt it. (My system has four Gen4 1TB NVME sticks in RAID 0 as a working project storage.) As such, I would suggest that using
sink_parquet
could take longer on a system where I/O bottlenecks are more pronounced.Admittedly, streaming the results directly to disk may not be want you want. But it may be what you need to get past this step of your processing with acceptable run-time and memory footprints.
If any of this helps, well and good. And please continue to give credit for the accepted answer to @Hericks. His answer was spot-on regarding how to use Polars’ native string-manipulation capabilities.