skip to Main Content

I have a ligh Python app which should perform a very simple task, but keeps crashing due to OOM.

What app should do

  1. Loads data from .parquet in to dataframe
  2. Calculate indicator using stockstats package
  3. Merge freshly calculated data into original dataframe -> here is crashes
  4. Store dataframe as .parquet

Where is crashes

df = pd.merge(df, st, on=['datetime'])

Using

  • Python 3.10
  • pandas~=2.1.4
  • stockstats~=0.4.1
  • Kubernetes 1.28.2-do.0 (running in Digital Ocean)

Here is the strange thing, the dataframe is very small (df.size is 208446, file size is 1.00337 MB, mem usage is 1.85537 MB).

Measured

import os

file_stats = os.stat(filename)
file_size = file_stats.st_size / (1024 * 1024)  # 1.00337 MB

df_mem_usage = dataframe.memory_usage(deep=True)
df_mem_usage_print = round(df_mem_usage.sum() / (1024 * 1024), 6   # 1.85537 MB

df_size = dataframe.size  # 208446

Deployment info

App is deployed into Kubernetes using Helm with following resources set

resources:
  limits:
    cpu: 1000m
    memory: 6000Mi
  requests:
    cpu: 1000m
    memory: 1000Mi

I am using nodes with 4vCPU + 8 GB memory and the node not under performance pressure. I have created dedicated node pool with 8 vCPU + 16 GB nodes, but same issue.

kubectl top node test-pool
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   38m          0%     2377Mi          17%

Pod info

kubectl describe pod xxx
...
    State:          Waiting
      Reason:       CrashLoopBackOff
    Last State:     Terminated
      Reason:       OOMKilled
      Exit Code:    137
      Started:      Sun, 24 Mar 2024 16:08:56 +0000
      Finished:     Sun, 24 Mar 2024 16:09:06 +0000
...

Here is CPU and memory consumption from Grafana. I am aware that very short Memory or CPU spikes will be hard to see, but from long term perspective, the app does not consume a lot of RAM. On the other hand, from my experience we are using the same pandas operations on containers with less RAM and dataframes are much much bigger with not problems.

Grafana stats

How should I fix this?
What else should I debug in order to prevent OOM?

Data and code example

Original dataframe (named df)

              datetime   open   high    low  close        volume
0  2023-11-14 11:15:00  2.185  2.187  2.171  2.187  19897.847314
1  2023-11-14 11:20:00  2.186  2.191  2.183  2.184   8884.634728
2  2023-11-14 11:25:00  2.184  2.185  2.171  2.176  12106.153954
3  2023-11-14 11:30:00  2.176  2.176  2.158  2.171  22904.354082
4  2023-11-14 11:35:00  2.171  2.173  2.167  2.171   1691.211455

New dataframe (named st).
Note: If trend_orientation = 1 => st_lower = NaN, if -1 => st_upper = NaN

              datetime   supertrend_ub  supertrend_lb    trend_orientation    st_trend_segment
0  2023-11-14 11:15:00   0.21495        NaN              -1                   1
1  2023-11-14 11:20:00   0.21495        NaN              -10                  1
2  2023-11-14 11:25:00   0.21495        NaN              -11                  1
3  2023-11-14 11:30:00   0.21495        NaN              -12                  1
4  2023-11-14 11:35:00   0.21495        NaN              -13                  1

Code example

import pandas as pd
import multiprocessing
import numpy as np
import stockstats


def add_supertrend(market):
    try:
        # Read data from file
        df = pd.read_parquet(market, engine="fastparquet")

        # Extract date columns
        date_column = df['datetime']

        # Convert to stockstats object
        st_a = stockstats.wrap(df.copy())
        # Generate supertrend
        st_a = st_a[['supertrend', 'supertrend_ub', 'supertrend_lb']]

        # Add back datetime columns
        st_a.insert(0, "datetime", date_column)

        # Add trend orientation using conditional columns
        conditions = [
            st_a['supertrend_ub'] == st_a['supertrend'],
            st_a['supertrend_lb'] == st_a['supertrend']
        ]
        
        values = [-1, 1]
        st_a['trend_orientation'] = np.select(conditions, values)

        # Remove not required supertrend values
        st_a.loc[st_a['trend_orientation'] < 0, 'st_lower'] = np.NaN
        st_a.loc[st_a['trend_orientation'] > 0, 'st_upper'] = np.NaN

        # Unwrap back to dataframe
        st = stockstats.unwrap(st_a)

        # Ensure correct date types are used
        st = st.astype({
            'supertrend': 'float32',
            'supertrend_ub': 'float32',
            'supertrend_lb': 'float32',
            'trend_orientation': 'int8'
        })
        # Add trend segments
        st_to = st[['trend_orientation']]
        st['st_trend_segment'] = st_to.ne(st_to.shift()).cumsum()
        
        # Remove trend value
        st.drop(columns=['supertrend'], inplace=True)

        # Merge ST with DF
        df = pd.merge(df, st, on=['datetime'])
        
        # Write back to parquet
        df.to_parquet(market, compression=None)
    except Exception as e:
        # Using proper logger in real code
        print(e)
        pass


def main():
    # Using fixed market as example, in real code market is fetched
    market = "BTCUSDT"
    # Using multiprocessing to free up memory after each iteration
    p = multiprocessing.Process(target=add_supertrend, args=(market,))
    p.start()
    p.join()


if __name__ == "__main__":
    main()

Dockerfile

FROM python:3.10

ENV PYTHONFAULTHANDLER=1 
    PYTHONHASHSEED=random 
    PYTHONUNBUFFERED=1 
    PYTHONPATH=.

# Adding vim
RUN ["apt-get", "update"]

# Get dependencies
COPY requirements.txt .
RUN pip3 install -r requirements.txt

# Copy main app
ADD . .
CMD main.py

Lukasz Tracewskis suggestion

Use Node-pressure Eviction in order to test whether pod even can allocate enough memory on nodes

I have done:

  • created new node pool: 8vCPU + 16 GB RAM
  • ensured that only my pod (and some system ones) will be deployed on this node (using tolerations and affinity)
  • run a stress test with no OOM or other errors
...
          image: "polinux/stress"
          command: ["stress"]
          args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
...
kubectl top node test-pool-j8t3y
NAME              CPU(cores)   CPU%   MEMORY(bytes)   MEMORY%
test-pool-j8t3y   694m         8%     7557Mi          54%

Node description

  Namespace                   Name                                   CPU Requests  CPU Limits  Memory Requests  Memory Limits  Age
  ---------                   ----                                   ------------  ----------  ---------------  -------------  ---
  kube-system                 cilium-24qxl                           300m (3%)     0 (0%)      300Mi (2%)       0 (0%)         43m
  kube-system                 cpc-bridge-proxy-csvvg                 100m (1%)     0 (0%)      75Mi (0%)        0 (0%)         43m
  kube-system                 csi-do-node-tzbbh                      0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 disable-systemd-upgrade-timer-mqjsk    0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 do-node-agent-dv2z2                    102m (1%)     0 (0%)      80Mi (0%)        300Mi (2%)     43m
  kube-system                 konnectivity-agent-wq5p2               0 (0%)        0 (0%)      0 (0%)           0 (0%)         43m
  kube-system                 kube-proxy-gvfrv                       0 (0%)        0 (0%)      125Mi (0%)       0 (0%)         43m
  scanners                    data-gw-enrich-d5cff4c95-bkjkc         100m (1%)     1 (12%)     1000Mi (7%)      6000Mi (43%)   2m33s

The pod did not crash due to OOM. So it is very likely that the issue will be inside code, somewhere.

Detailed memory monitoring

I have inserted memory measurement into multiple points. I am measuring both dataframe size and memory usage using psutil.

import psutil

total = round(psutil.virtual_memory().total / 1000 / 1000, 4)
used = round(psutil.virtual_memory().used / 1000 / 1000, 4)
pct = round(used / total * 100, 1)
logger.info(f"[Current memory usage is: {used} / {total} MB ({pct} %)]")

Memory usage

  • prior read data from file
    • RAM: 938.1929 MB
  • after df loaded
    • df_mem_usage: 1.947708 MB
    • RAM: 954.1181 MB
  • after ST generated
    • df_mem_usage of ST df: 1.147757 MB
    • RAM: 944.9226 MB
  • line before df merge
    • df_mem_usage: 945.4223 MB

Not using multiprocessing

In order to "reset" memory every iteration, I am using multiprocessing. However I wanted to be sure that this does not cause troubles. I have removed it and called the add_supertrend directly. But it ended up in OOM, so I do not think this is the problem.

Real data

As suggested by Lukasz Tracewski, I am sharing real data which are causing the OOM crash. Since they are in parquet format, I cannot use services like pastebin and I am using GDrive instead. I will use this folder to share any other stuff related to this question/issue.

Upgrade pandas to 2.2.1

Sometimes plain pacakge upgrade might help, so I have decide to try using upgrading pandas to 2.2.1 and also fastparquet to 2024.2.0 (newer pandas required newer fastparquet). pyarrow was also updated to 15.0.0.

It seemed to work during first few iterations, but than crashed with OOM again.

Using Dask

I remembered that when I used to solve complex operations with dataframes, I used dask. So I tried to use it in this case as well. Without success. OOM again. Using dask 2024.3.1.

import dask.dataframe as dd
# mem usage 986.452 MB
ddf1 = dd.from_pandas(df)
# mem usage 1015.37 MB
ddf2 = dd.from_pandas(st)
# mem usage 1019.50 MB
df_dask = dd.merge(ddf1, ddf2, on='datetime')
# mem usage 1021.56 MB
df = df_dask.compute() <- here it crashes ¯_(ツ)_/¯

Duplicated datetimes

During investigating data with dask, I have noticed that there are duplicate records for datetime columns. This is definitely wrong, datetime has to be unique. I think this might cause the issue. I will investigate that further.

df.tail(10)
             datetime   open   high     low   close         volume
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408
0 2024-02-26 02:55:00  0.234  0.238  0.2312  0.2347  103225.029408

I have implemented a fix which removes duplicate records in the other component that prepares data. Fix looks like this and I will monitor whether this will help or not.

    # Append gathered data to df and write to file
    df = pd.concat([df, fresh_data])

    # Drop duplicates
    df = df.drop_duplicates(subset=["datetime"])

2

Answers


  1. Your Python application is running into an Out-Of-Memory (OOM) crash during the merge operation with Pandas.

    +---------------+       +-------------+        +--------------+          +-------------------+
    | Load .parquet |  -->  | Calculate   |  --->  | Merge Data   |  --X-->  | Store .parquet    |
    |  to DataFrame |       | Indicator   |        | Frames       |          | (Crashes here)    |
    +---------------+       +-------------+        +--------------+          +-------------------+
    

    Even though the dataframes are relatively small, the crash indicates that the memory spikes are occurring during the merge operation. The CPU and memory usage stats from Grafana suggest that under normal operations, the resource usage is within limits, but a merge operation can cause a spike that exceeds the available memory.

    You could try and reduce memory usage during merge by breaking the operation into smaller chunks if possible. And explicitly delete temporary dataframes or variables not in use to free up memory.

    More generally, monitor memory usage step by step to pinpoint exactly when the spike occurs. And implement logging to capture the state of the process:

    import pandas as pd
    import multiprocessing
    import numpy as np
    import stockstats
    import os
    
    # Add a memory usage logger function:
    def log_memory_usage(df, step):
        mem = df.memory_usage(deep=True).sum() / (1024 * 1024)  # in MB
        print(f'Memory usage after {step}: {mem:.2f} MB')
    
    def add_supertrend(market):
        try:
            df = pd.read_parquet(market, engine="fastparquet")
            log_memory_usage(df, 'loading dataframe')
    
            # Perform the rest of the operations as before
            # 
    
            # After generating supertrend data
            log_memory_usage(st, 'after generating supertrend')
    
            # Before merge operation
            log_memory_usage(df, 'before merge')
    
            df = pd.merge(df, st, on=['datetime'])
    
            # After merge operation
            log_memory_usage(df, 'after merge')
    
            # Save the result
            df.to_parquet(market, compression=None)
    
        except Exception as e:
            print(e)
    
    # main() and if __name__ == "__main__": block remains the same
    
    

    By adding logging at each step, you can check the console output to see the memory usage right before the crash happens.


    As Lukasz Tracewski suggests in the comments, a sanity check would make sure that the Kubernetes environment and the app’s configuration can actually handle the memory allocation as expected.

    You can allocate a substantial amount of memory (5 GB) to see if the Kubernetes environment handles it as expected. If the test succeeds without an OOM crash, then the issue may not be with the Kubernetes configuration but possibly with how memory is being handled in the Python application itself or with how the pandas merge operation is being performed.

    Create a file named memory-stress-test.yaml and run kubectl apply -f memory-stress-test.yaml.

    apiVersion: v1
    kind: Pod
    metadata:
      name: memory-stress-test
    spec:
      containers:
      - name: memory-stress-test
        image: polinux/stress
        resources:
          limits:
            memory: "6000Mi"
          requests:
            memory: "5000Mi"
        command: ["stress"]
        args: ["--vm", "1", "--vm-bytes", "5G", "--vm-hang", "1"]
    

    Then monitor the Pod’s status with kubectl get pod memory-stress-test and review any events with kubectl describe pod memory-stress-test.

    If the environment passes this test, the problem is likely within the code or data handling and not with the container or node configuration.


    The node was low on resource: memory. Threshold quantity: 100Mi, available: 88864Ki. Container data-gw-enrich was using 6023732Ki, request is 4000Mi, has larger consumption of memory.

    The new error message and eviction status suggest that the Kubernetes node on which the pod is scheduled runs out of memory, triggering a node-pressure eviction. The memory consumption by the data-gw-enrich container is significantly larger than the request of 4000 MiB, which indicates that either the container’s memory requirement is underestimated, or there is an inefficiency or memory leak in the application.

    The message from the Kubernetes node-pressure eviction documentation indicates that the kubelet may proactively terminate pods to reclaim resources on nodes. When a pod is evicted for node pressure, it is because the kubelet has determined that the node lacks sufficient free memory to maintain system stability.

    From your last error message:

    • The available memory on the node is around 86.78 MiB, which is below the defined threshold.
    • The container was using approximately 5882.55 MiB, which is more than the request of 4000 MiB.

    If your workload typically needs more memory and your nodes are close to their memory limits, consider adding more nodes with larger memory capacity to your cluster or resizing the existing nodes.
    If you add a new node with more memory, you can use node affinity to schedule your pod specifically on that node. That way, your memory-intensive pod will not affect other pods in the cluster.
    Label your node (kubectl label nodes <node-name> high-memory=true), and update your pod specification to use node affinity:

    apiVersion: v1
    kind: Pod
    metadata:
      name: memory-intensive-pod
    spec:
      containers:
      - name: app-container
        image: your-image
      affinity:
        nodeAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
            nodeSelectorTerms:
            - matchExpressions:
              - key: high-memory
                operator: In
                values:
                - "true"
    

    For setting aside system resources, you adjust your kubelet configuration to specify --system-reserved and --kube-reserved. That can be done in the kubelet config file or via command-line flags.

    # /var/lib/kubelet/config.yaml
    apiVersion: kubelet.config.k8s.io/v1beta1
    kind: KubeletConfiguration
    systemReserved:
      cpu: "500m"
      memory: "1Gi"
    kubeReserved:
      cpu: "500m"
      memory: "1Gi"
    
    Login or Signup to reply.
  2. The stockstats.StockDataFrame is a subclass of pd.DataFrame.
    This implies there is no need to do a merge in the end.

    After making sure the "datetime" column in the original df contains no duplicates, you could define the st dataframe as

        st = stockstats.wrap(df, index_column="datetime")
    

    So, passing in df instead of df.copy() and using the existing datetime column as index. (If you don’t do this, stockstats will look for a date column which doesn’t exist here.)

    There is also no need to have a separate st_a variable. All dataframe operations could be performed on st (dropping any unneeded columns when you’re done.)

    Then in the end, there is no need to do an expensive merge or even to unwrap the st frame, since it is already an instance of pd.DataFrame (containing the original data), so you can directly call:

        st.reset_index().to_parquet(market, compression=None)
        # reset_index since you do want the `datetime` column back as regular column
    

    The only difference at the end is that the st dataframe will have lowercased all the column names. This is not a problem for your example df, but if this was not what you wanted, then you could restore the original columns names again, for instance by creating a little mapping in the beginning or by other means (see: stockstats.py#retype)

    In your original code you are explicitly copying the df once, and if you would use stocktrace.unwrap you would again implicitly create a new full copy. The problem with using merge (the way it’s used in the original code with an explicit on=...) is also that you are essentially merging a dataframe with itself (or something that is close to a copy) — all the original columns, apart from the on column, will be duplicated in the merged output (which is probably not what you want). This could also aggravate memory issues.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search