skip to Main Content

I’m trying to build a minimal datapipeline using docker, postgres, and airflow. My docker-compose.yaml file can be found here and is exteneded from airflow’s documentation here. I’ve extended it to include a seperate postgres database where I will write data, and a pgadmin instance (these are added near the bottom).

I can confirm that the services are running and accessible when I run docker compose up -d, and I can log into the airflow web UI to see my dags. I’ve created a very simple dag to insert the date and time into a table every minute. The dag code is show below:

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import psycopg2
from airflow.hooks.postgres_hook import PostgresHook

default_args = {
    'owner': 'airflow',
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2024, 1, 1),
}

def create_table():
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    create_query = """
        CREATE TABLE IF NOT EXISTS fact_datetime (
            datetime TIMESTAMP
        );
    """
    cursor.execute(create_query)
    conn.commit()
    cursor.close()
    conn.close()

def insert_datetime():
    pg_hook = PostgresHook(postgres_conn_id='postgres_default')
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    insert_query = """
        INSERT INTO fact_datetime (datetime)
        VALUES (NOW());
    """
    cursor.execute(insert_query)
    conn.commit()
    cursor.close()
    conn.close()

with DAG('insert_datetime_dag',
         default_args=default_args,
         description='DAG to insert current datetime every minute',
         schedule_interval='*/1 * * * *',
         catchup=False) as dag:

    create_table_task = PythonOperator(
        task_id='create_table',
        python_callable=create_table,
    )

    insert_datetime_task = PythonOperator(
        task_id='insert_datetime',
        python_callable=insert_datetime,
    )

    create_table_task >> insert_datetime_task

Before running this dag, I’ve added a postgres connection in the airflow web UI, which should allow me to use the PostgreHook.

When I run the dag, the runs seem to be stuck on the create_table task, with the following logs:


ce682335169d
*** Found local files:
***   * /opt/airflow/logs/dag_id=insert_datetime_dag/run_id=scheduled__2024-01-02T17:24:00+00:00/task_id=create_table/attempt=1.log
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=non-requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1957} INFO - Dependencies all met for dep_context=requeueable deps ti=<TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [queued]>
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2171} INFO - Starting attempt 1 of 2
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2192} INFO - Executing <Task(PythonOperator): create_table> on 2024-01-02 17:24:00+00:00
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:60} INFO - Started process 148 to run task
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:87} INFO - Running: ['***', 'tasks', 'run', 'insert_datetime_dag', 'create_table', 'scheduled__2024-01-02T17:24:00+00:00', '--job-id', '7', '--raw', '--subdir', 'DAGS_FOLDER/dag.py', '--cfg-path', '/tmp/tmpkkdtejih']
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:88} INFO - Job 7: Subtask create_table
[2024-01-02, 17:25:26 UTC] {task_command.py:423} INFO - Running <TaskInstance: insert_datetime_dag.create_table scheduled__2024-01-02T17:24:00+00:00 [running]> on host ce682335169d
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2481} INFO - Exporting env vars: AIRFLOW_CTX_DAG_OWNER='***' AIRFLOW_CTX_DAG_ID='insert_datetime_dag' AIRFLOW_CTX_TASK_ID='create_table' AIRFLOW_CTX_EXECUTION_DATE='2024-01-02T17:24:00+00:00' AIRFLOW_CTX_TRY_NUMBER='1' AIRFLOW_CTX_DAG_RUN_ID='scheduled__2024-01-02T17:24:00+00:00'
[2024-01-02, 17:25:26 UTC] {base.py:83} INFO - Using connection ID 'postgres_default' for task execution.
[2024-01-02, 17:25:26 UTC] {taskinstance.py:2699} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 433, in _execute_task
    result = execute_callable(context=context, **execute_callable_kwargs)
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 199, in execute
    return_value = self.execute_callable()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/operators/python.py", line 216, in execute_callable
    return self.python_callable(*self.op_args, **self.op_kwargs)
  File "/opt/airflow/dags/dag.py", line 16, in create_table
    conn = pg_hook.get_conn()
  File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/postgres/hooks/postgres.py", line 158, in get_conn
    self.conn = psycopg2.connect(**conn_args)
  File "/home/airflow/.local/lib/python3.8/site-packages/psycopg2/__init__.py", line 122, in connect
    conn = _connect(dsn, connection_factory=connection_factory, **kwasync)
psycopg2.OperationalError: connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
    Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
    Is the server running on that host and accepting TCP/IP connections?
[2024-01-02, 17:25:26 UTC] {taskinstance.py:1138} INFO - Marking task as UP_FOR_RETRY. dag_id=insert_datetime_dag, task_id=create_table, execution_date=20240102T172400, start_date=20240102T172526, end_date=20240102T172526
[2024-01-02, 17:25:26 UTC] {standard_task_runner.py:107} ERROR - Failed to execute job 7 for task create_table (connection to server at "localhost" (127.0.0.1), port 5432 failed: Connection refused
    Is the server running on that host and accepting TCP/IP connections?
connection to server at "localhost" (::1), port 5432 failed: Cannot assign requested address
    Is the server running on that host and accepting TCP/IP connections?
; 148)
[2024-01-02, 17:25:26 UTC] {local_task_job_runner.py:234} INFO - Task exited with return code 1
[2024-01-02, 17:25:26 UTC] {taskinstance.py:3281} INFO - 0 downstream tasks scheduled from follow-on schedule check

If I’ve read this correctly, it seems airflow can not see my postgres instance. This should be solved by exposing port 5432 to one of the airflow services.

I’m not sure which service needs exposure to the port, and I’m not sure how to edit my docker compose file. Could someone please:

  • Let me know if I’m correct in my assessment of the problem, and
  • Suggest the correct edits to my docker compose file so I can run my dag successfully.

2

Answers


  1. Docker Compose assigns a network hosts entry for each service you define in your docker-compose.yml file. By default it uses the name of the service. You will not be able to contact a service without using the proper name of the service as defined in docker-compose.yml.

    You are attempting to connect to localhost however your docker-compose defines the postgresql hostnames as db and postgres, I am assuming your added one is postgres.

     postgres:
        image: postgres:13
        environment:
          POSTGRES_USER: airflow
          POSTGRES_PASSWORD: airflow
          POSTGRES_DB: airflow
        volumes:
          - postgres-db-volume:/var/lib/postgresql/data
        healthcheck:
          test: ["CMD", "pg_isready", "-U", "airflow"]
          interval: 10s
          retries: 5
          start_period: 5s
        restart: always
    
     db:
        image: postgres:13
        environment:
          POSTGRES_USER: root
          POSTGRES_PASSWORD: root
          POSTGRES_DB: airflow_db
        volumes:
          - "./postgres_data:/var/lib/postgresql/data:rw"
        ports:
          - "5432:5432"
    

    Change your code to use hostname db or postgres

    Login or Signup to reply.
  2. The Fix:

    You should be able to connect to the db postgres service by setting your postgres_default connection variable so it points to the service:

    Under line 55 of your docker-compose:

    AIRFLOW_CONN_POSTGRES_DEFAULT: postgres://root:root@db:5432/airflow_db
    

    Additional notes:

    • I do not have the ability to add a comment under your question, but the comment stating "It looks like you’re trying to connect to the database on localhost, but the database is actually running on hostname postgres" is somewhat incorrect. It is true that there is a postgres service running under the hostname postgres but this is not the postgres service you added in your docker-compose. The postgres service you added is running under the hostname db.
    • If you are goaling to use the database outside of airflow, you might consider using a postgresql database running locally and outside of docker. Assuming your port number and database name are the same, the line in your docker-compose would be…
    AIRFLOW_CONN_POSTGRES_DEFAULT=postgres://<username>:<password>@host.docker.internal:5432/airflow_db
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search