skip to Main Content

what is the best way to clean xcom table in airflow? thats running in docker with postgres db.

i’ve try to delete some data with query (Delete from xcom) and try running this reference: https://cloud.google.com/composer/docs/cleanup-airflow-database but it doesn’t work, xcom table size still doesn’t decrease

related to this case it takes up a lot of storage on my host server

edited:
version I use: Airflow 1.10.3

xcom db

2

Answers


  1. Chosen as BEST ANSWER
    DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
    

    after execute that query, then run VACUUM FULL; from the host

    condition after cleanup xcom db


  2. With every question, please provide details (versions, stack traces, etc.) about what you tried, and what didn’t work. Saying "it doesn’t work" doesn’t tell anything and makes it difficult to help. See https://stackoverflow.com/help/how-to-ask for how to write a good question.

    If you have direct access to the metastore, you could execute this query:

    DELETE FROM xcom;
    

    The XCom table contains a timestamp which you can use to preserve recent XComs. For example, this query deletes all XComs older than 14 days:

    DELETE FROM xcom WHERE current_date - "timestamp"::date > 14;
    

    If you do not have direct access to the metastore, you could create a DAG to clean up objects in the metastore. Whether you run this on a schedule to clean objects periodically, or without a schedule and run it whenever you want to clean up the database is up to yourself:

    import datetime
    
    from airflow import DAG
    from airflow.models import XCom
    from airflow.operators.python import PythonOperator
    from airflow.utils.session import provide_session
    
    with DAG(dag_id="cleanup_xcoms", schedule_interval=None, start_date=datetime.datetime(2022, 1, 1)) as dag:
    
        @provide_session
        def _delete_xcoms(session=None):
            num_rows_deleted = 0
    
            try:
                num_rows_deleted = session.query(XCom).delete()
                session.commit()
            except:
                session.rollback()
    
            print(f"Deleted {num_rows_deleted} XCom rows")
    
        delete_xcoms = PythonOperator(task_id="delete_xcoms", python_callable=_delete_xcoms)
    

    The XCom object has several attributes which you could filter on, for example dag_id:

    session.query(XCom).filter(XCom.dag_id == "mydag123").delete()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search