skip to Main Content

I have a running Apache AirFlow started with docker-compose. Also I have a docker-compose project which I’d like to manage with AirFlow.

example_workers/docker-compose.yml:

version: '3'

services:

  my_app:
    build:
      context: ./my_app/
    environment:
      TEST: 1
      INDOCKER: 1
    restart: "no"
    volumes:
      - ./my_app:/my_app
    depends_on:
     - redis

  redis:
    build:
      context: ./redis/.
    restart: always
    volumes:
      - ./redis/data:/data

To be more specific: I’d like to run docker-compose build && docker-compose up -d in my project. Redis starts and keeps running, and my_app runs once then stops. After that I’d like to start the my_app container with AirFlow.

But DockerOperator seems to always create the container before running it and demands some extra configuration therefore.

from datetime import datetime, timedelta

from airflow import DAG
from airflow.providers.docker.operators.docker import DockerOperator

dag = DAG(
    'my_app',
    default_args={'retries': 0},
    schedule_interval=timedelta(minutes=10),
    start_date=datetime(2021, 1, 1),
    catchup=False,
)

t0 = DockerOperator(
    docker_url='unix:///var/run/docker.sock',
    image='example_workers_my_app',
    container_name='example_workers_my_app_1',
    task_id='docker_op_tester',
    dag=dag,
    mount_tmp_dir=False
)

Logs:

[2022-05-19, 18:15:17 UTC] {taskinstance.py:1889} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 268, in _raise_for_status
    response.raise_for_status()
  File "/home/airflow/.local/lib/python3.7/site-packages/requests/models.py", line 960, in raise_for_status
    raise HTTPError(http_error_msg, response=self)
requests.exceptions.HTTPError: 409 Client Error: Conflict for url: http+docker://localhost/v1.39/containers/create?name=example_workers_my_app_1

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 387, in execute
    return self._run_image()
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 266, in _run_image
    return self._run_image_with_mounts(self.mounts, add_tmp_variable=False)
  File "/home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py", line 298, in _run_image_with_mounts
    tty=self.tty,
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 428, in create_container
    return self.create_container_from_config(config, name)
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/container.py", line 439, in create_container_from_config
    return self._result(res, True)
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 274, in _result
    self._raise_for_status(response)
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/api/client.py", line 270, in _raise_for_status
    raise create_api_error_from_http_exception(e)
  File "/home/airflow/.local/lib/python3.7/site-packages/docker/errors.py", line 31, in create_api_error_from_http_exception
    raise cls(e, response=response, explanation=explanation)
docker.errors.APIError: 409 Client Error for http+docker://localhost/v1.39/containers/create?name=example_workers_my_app_1: Conflict ("Conflict. The container name "/example_workers_my_app_1" is already in use by container "7e40949857be7a3e2ea3b6001562d0cde285c4ca2a0aa1add4851021ba9a9ed6". You have to remove (or rename) that container to be able to reuse that name.")

If I manually remove the container and set auto_remove=True in the DockerOperator then it works fine but I have two concerns: I must configure the container in the operator now (e.g. set the correct network, environment variables, etc.) which means to duplicate the docker-compose.yml settings. Second, it must be a little bit overhead to constantly create and remove the same container instead of using an existing one.

Should I use another Operator type like BashOperator to achieve the desired result? Will I be able to get as much useful (e.g. docker-specific logs or something) info about my jobs using BashOperator as if I was using DockerOperator?

2

Answers


  1. Chosen as BEST ANSWER

    It turned out that the easiest way for me to solve the problem when running AirFlow itself in docker (it makes it hard to use BashOperator as there is no docker executable inside the container) and having a DockerOperator up and running is just to modify the DockerOperator itself so that when it finds a container with the same name it doesn't raise an exception but just starts the found container.

    WARNING. This is a dirty solution. DockerOperator after implementing it starts to ignore most of its params silently when there is an existing container. Besides, modifying the source code makes it harder to update the app in the future.

    In docker-compose-based AirFlow the DockerOperator source code is located in /home/airflow/.local/lib/python3.7/site-packages/airflow/providers/docker/operators/docker.py.

    I've wrapped this part (thanks to @kthompso for pointing to exact location in the file):

    self.container = self.cli.create_container(
        ...
    )
    

    With try .. except:

    try:
        self.container = self.cli.create_container(
            ...
        )
    except APIError as e:
        if (e.status_code != 409) or ('container name' not in e.explanation):
            raise e
        self.container = self.cli.containers(
            filters={'name': self.container_name}, 
            all=True, quiet=True, latest=True
        )[0]
    

    If docker client fails to create a container with given name due to 409 Conflict and error explanation mentions "container name" then the container probably already exists and can be fetched using the same docker client. Luckily there is a suitable method in it. Part of its docstring explaining the used arguments:

    """
    List containers. Similar to the ``docker ps`` command.
    
    Args:
        quiet (bool): Only display numeric Ids
        all (bool): Show all containers. Only running containers are shown
            by default
        ...
        latest (bool): Show only the latest created container, include
            non-running ones.
        ...
        filters (dict): Filters to be processed on the image list.
            Available filters:
    
            ...
            - `name` (str): The name of the container.
        ...
    
    Returns:
        A list of dicts, one per container
    """
    

  2. Looking at the code, it seems that the DockerOperator is designed to create a new container, not start an existing one. So I don’t think you will have much success doing it that way unless you start a new container with privileged access to the docker socket and then execute a command to start the existing docker container. That feels a bit circuitous, although it’s technically a solution.

    I would suggest either using a BashOperator (as you mentioned) or using a package like Docker-Py to execute docker start commands from within a python DAG.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search