I have a ligh Python app which should perform a very simple task, but keeps crashing due to OOM.
What app should do
- Loads data from
.parquet
in to dataframe - Calculate indicator using
stockstats
package - Merge freshly calculated data into original dataframe -> here is crashes
- Store dataframe as
.parquet
Where is crashes
df = pd.merge(df, st, on=['datetime'])
Using
- Python
3.10
pandas~=2.1.4
stockstats~=0.4.1
- Kubernetes
1.28.2-do.0
(running in Digital Ocean)
Here is the strange thing, the dataframe is very small (df.size
is 208446
, file size is 1.00337 MB
, mem usage is 1.85537 MB
).
Measured
import os
file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024) # 1.00337 MB
df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6 # 1.85537 MB
df_size = dataframe.size # 208446
Deployment info
App is deployed into Kubernetes using Helm with following resources set
resources:
limits:
cpu: 1000m
memory: 6000Mi
requests:
cpu: 1000m
memory: 1000Mi
I am using nodes with 4vCPU + 8 GB memory and the node not under performance pressure. I have created dedicated node pool with 8 vCPU + 16 GB nodes, but same issue.
kubectl top node test-pool
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 38m 0% 2377Mi 17%
Pod info
kubectl describe pod xxx
...
State: Waiting
Reason: CrashLoopBackOff
Last State: Terminated
Reason: OOMKilled
Exit Code: 137
Started: Sun, 24 Mar 2024 16:08:56 +0000
Finished: Sun, 24 Mar 2024 16:09:06 +0000
...
Here is CPU and memory consumption from Grafana. I am aware that very short Memory or CPU spikes will be hard to see, but from long term perspective, the app does not consume a lot of RAM. On the other hand, from my experience we are using the same pandas
operations on containers with less RAM and dataframes are much much bigger with not problems.
How should I fix this?
What else should I debug in order to prevent OOM?
Data and code example
Original dataframe (named df
)
datetime open high low close volume
0 2023-11-14 11:15:00 2.185 2.187 2.171 2.187 19897.847314
1 2023-11-14 11:20:00 2.186 2.191 2.183 2.184 8884.634728
2 2023-11-14 11:25:00 2.184 2.185 2.171 2.176 12106.153954
3 2023-11-14 11:30:00 2.176 2.176 2.158 2.171 22904.354082
4 2023-11-14 11:35:00 2.171 2.173 2.167 2.171 1691.211455
New dataframe (named st
).
Note: If trend_orientation = 1
=> st_lower = NaN
, if -1 => st_upper = NaN
datetime supertrend_ub supertrend_lb trend_orientation st_trend_segment
0 2023-11-14 11:15:00 0.21495 NaN -1 1
1 2023-11-14 11:20:00 0.21495 NaN -10 1
2 2023-11-14 11:25:00 0.21495 NaN -11 1
3 2023-11-14 11:30:00 0.21495 NaN -12 1
4 2023-11-14 11:35:00 0.21495 NaN -13 1
Code example
import pandas as pd
import multiprocessing
import numpy as np
import stockstats
def add_supertrend(market):
try:
# Read data from file
df = pd.read_parquet(market, engine="fastparquet")
# Extract date columns
date_column = df['datetime']
# Convert to stockstats object
st_a = stockstats.wrap(df.copy())
# Generate supertrend
st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]
# Add back datetime columns
st_a.insert(0, "datetime", date_column)
# Add trend orientation using conditional columns
conditions = [
st_a['supertrend_ub'] == st_a['supertrend'],
st_a['supertrend_lb'] == st_a['supertrend']
]
values = [-1, 1]
st_a['trend_orientation'] = np.select(conditions, values)
# Remove not required supertrend values
st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN
# Unwrap back to dataframe
st = stockstats.unwrap(st_a)
# Ensure correct date types are used
st = st.astype({
'supertrend': 'float32',
'supertrend_ub': 'float32',
'supertrend_lb': 'float32',
'trend_orientation': 'int8'
})
# Add trend segments
st_to = st[['trend_orientation']]
st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
# Remove trend value
st.drop(columns=['supertrend'], inplace=True)
# Merge ST with DF
df = pd.merge(df, st, on=['datetime'])
# Write back to parquet
df.to_parquet(market, compression=None)
except Exception as e:
# Using proper logger in real code
print(e)
pass
def main():
# Using fixed market as example, in real code market is fetched
market = "BTCUSDT"
# Using multiprocessing to free up memory after each iteration
p = multiprocessing.Process(target=add_supertrend, args=(market,))
p.start()
p.join()
if __name__ == "__main__":
main()
Dockerfile
FROM python:3.10
ENV PYTHONFAULTHANDLER=1
PYTHONHASHSEED=random
PYTHONUNBUFFERED=1
PYTHONPATH=.
# Adding vim
RUN ["apt-get", "update"]
# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt
# Copy main app
ADD . .
CMD main.py
Lukasz Tracewskis suggestion
Use Node-pressure Eviction in order to test whether pod even can allocate enough memory on nodes
I have done:
- created new node pool:
8vCPU + 16 GB RAM
- ensured that only my pod (and some system ones) will be deployed on this node (using tolerations and affinity)
- run a stress test with no OOM or other errors
...
image: "polinux/stress"
command: ["stress"]
args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME CPU(cores) CPU% MEMORY(bytes) MEMORY%
test-pool-j8t3y 694m 8% 7557Mi 54%
Node description
Namespace Name CPU Requests CPU Limits Memory Requests Memory Limits Age
--------- ---- ------------ ---------- --------------- ------------- ---
kube-system cilium-24qxl 300m (3%) 0 (0%) 300Mi (2%) 0 (0%) 43m
kube-system cpc-bridge-proxy-csvvg 100m (1%) 0 (0%) 75Mi (0%) 0 (0%) 43m
kube-system csi-do-node-tzbbh 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system disable-systemd-upgrade-timer-mqjsk 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system do-node-agent-dv2z2 102m (1%) 0 (0%) 80Mi (0%) 300Mi (2%) 43m
kube-system konnectivity-agent-wq5p2 0 (0%) 0 (0%) 0 (0%) 0 (0%) 43m
kube-system kube-proxy-gvfrv 0 (0%) 0 (0%) 125Mi (0%) 0 (0%) 43m
scanners data-gw-enrich-d5cff4c95-bkjkc 100m (1%) 1 (12%) 1000Mi (7%) 6000Mi (43%) 2m33s
The pod did not crash due to OOM. So it is very likely that the issue will be inside code, somewhere.
Detailed memory monitoring
I have inserted memory measurement into multiple points. I am measuring both dataframe size and memory usage using psutil
.
import psutil
total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")
Memory usage
- prior read data from file
- RAM:
938.1929 MB
- RAM:
- after df loaded
- df_mem_usage:
1.947708 MB
- RAM:
954.1181 MB
- df_mem_usage:
- after ST generated
- df_mem_usage of ST df:
1.147757 MB
- RAM:
944.9226 MB
- df_mem_usage of ST df:
- line before df merge
- df_mem_usage:
945.4223 MB
- df_mem_usage:
Not using multiprocessing
In order to "reset" memory every iteration, I am using multiprocessing
. However I wanted to be sure that this does not cause troubles. I have removed it and called the add_supertrend
directly. But it ended up in OOM, so I do not think this is the problem.
Real data
As suggested by Lukasz Tracewski, I am sharing real data which are causing the OOM crash. Since they are in parquet
format, I cannot use services like pastebin and I am using GDrive instead. I will use this folder to share any other stuff related to this question/issue.
Upgrade pandas to 2.2.1
Sometimes plain pacakge upgrade might help, so I have decide to try using upgrading pandas to 2.2.1
and also fastparquet
to 2024.2.0
(newer pandas required newer fastparquet). pyarrow
was also updated to 15.0.0
.
It seemed to work during first few iterations, but than crashed with OOM again.
Using Dask
I remembered that when I used to solve complex operations with dataframes, I used dask. So I tried to use it in this case as well. Without success. OOM again. Using dask
2024.3.1
.
import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯_(ツ)_/¯
Duplicated datetimes
During investigating data with dask, I have noticed that there are duplicate records for datetime
columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.
df.tail(10)
datetime open high low close volume
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
0 2024-02-26 02:55:00 0.234 0.238 0.2312 0.2347 103225.029408
I have implemented a fix which removes duplicate records in the other component that prepares data. Fix looks like this and I will monitor whether this will help or not.
# Append gathered data to df and write to file
df = pd.concat([df, fresh_data])
# Drop duplicates
df = df.drop_duplicates(subset=["datetime"])
2
Answers
Your Python application is running into an Out-Of-Memory (OOM) crash during the merge operation with Pandas.
Even though the dataframes are relatively small, the crash indicates that the memory spikes are occurring during the merge operation. The CPU and memory usage stats from Grafana suggest that under normal operations, the resource usage is within limits, but a merge operation can cause a spike that exceeds the available memory.
You could try and reduce memory usage during merge by breaking the operation into smaller chunks if possible. And explicitly delete temporary dataframes or variables not in use to free up memory.
More generally, monitor memory usage step by step to pinpoint exactly when the spike occurs. And implement logging to capture the state of the process:
By adding logging at each step, you can check the console output to see the memory usage right before the crash happens.
As Lukasz Tracewski suggests in the comments, a sanity check would make sure that the Kubernetes environment and the app’s configuration can actually handle the memory allocation as expected.
You can allocate a substantial amount of memory (5 GB) to see if the Kubernetes environment handles it as expected. If the test succeeds without an OOM crash, then the issue may not be with the Kubernetes configuration but possibly with how memory is being handled in the Python application itself or with how the
pandas
merge operation is being performed.Create a file named
memory-stress-test.yaml
and runkubectl apply -f memory-stress-test.yaml
.Then monitor the Pod’s status with
kubectl get pod memory-stress-test
and review any events withkubectl describe pod memory-stress-test
.If the environment passes this test, the problem is likely within the code or data handling and not with the container or node configuration.
The new error message and eviction status suggest that the Kubernetes node on which the pod is scheduled runs out of memory, triggering a node-pressure eviction. The memory consumption by the
data-gw-enrich
container is significantly larger than the request of 4000 MiB, which indicates that either the container’s memory requirement is underestimated, or there is an inefficiency or memory leak in the application.The message from the Kubernetes node-pressure eviction documentation indicates that the kubelet may proactively terminate pods to reclaim resources on nodes. When a pod is evicted for node pressure, it is because the kubelet has determined that the node lacks sufficient free memory to maintain system stability.
From your last error message:
If your workload typically needs more memory and your nodes are close to their memory limits, consider adding more nodes with larger memory capacity to your cluster or resizing the existing nodes.
If you add a new node with more memory, you can use node affinity to schedule your pod specifically on that node. That way, your memory-intensive pod will not affect other pods in the cluster.
Label your node (
kubectl label nodes <node-name> high-memory=true
), and update your pod specification to use node affinity:For setting aside system resources, you adjust your kubelet configuration to specify
--system-reserved
and--kube-reserved
. That can be done in the kubelet config file or via command-line flags.The stockstats.StockDataFrame is a subclass of pd.DataFrame.
This implies there is no need to do a merge in the end.
After making sure the "datetime" column in the original df contains no duplicates, you could define the
st
dataframe asSo, passing in
df
instead ofdf.copy()
and using the existingdatetime
column as index. (If you don’t do this,stockstats
will look for adate
column which doesn’t exist here.)There is also no need to have a separate
st_a
variable. All dataframe operations could be performed onst
(dropping any unneeded columns when you’re done.)Then in the end, there is no need to do an expensive merge or even to unwrap the
st
frame, since it is already an instance of pd.DataFrame (containing the original data), so you can directly call:The only difference at the end is that the
st
dataframe will have lowercased all the column names. This is not a problem for your example df, but if this was not what you wanted, then you could restore the original columns names again, for instance by creating a little mapping in the beginning or by other means (see: stockstats.py#retype)In your original code you are explicitly copying the df once, and if you would use
stocktrace.unwrap
you would again implicitly create a new full copy. The problem with using merge (the way it’s used in the original code with an expliciton=...
) is also that you are essentially merging a dataframe with itself (or something that is close to a copy) — all the original columns, apart from theon
column, will be duplicated in the merged output (which is probably not what you want). This could also aggravate memory issues.