I am using the following docker-compose image, I got this image from: https://github.com/apache/airflow/blob/main/docs/apache-airflow/start/docker-compose.yaml
version: "3"
x-airflow-common: &airflow-common
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.0.0-python3.7}
environment: &airflow-common-env
AIRFLOW__CORE__EXECUTOR: CeleryExecutor
AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
AIRFLOW__CORE__FERNET_KEY: ""
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: "true"
AIRFLOW__CORE__LOAD_EXAMPLES: "false"
AIRFLOW__API__AUTH_BACKEND: "airflow.api.auth.backend.basic_auth"
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:${AIRFLOW_GID:-50000}"
depends_on: &airflow-common-depends-on
redis:
condition: service_healthy
postgres:
condition: service_healthy
services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 5s
retries: 5
restart: always
redis:
image: redis:latest
ports:
- 6379:6379
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 30s
retries: 50
restart: always
airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- 8080:8080
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test:
[
"CMD-SHELL",
'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"',
]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-worker:
<<: *airflow-common
command: celery worker
healthcheck:
test:
- "CMD-SHELL"
- 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
airflow-init:
<<: *airflow-common
command: version
environment:
<<: *airflow-common-env
_AIRFLOW_DB_UPGRADE: "true"
_AIRFLOW_WWW_USER_CREATE: "true"
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
flower:
<<: *airflow-common
command: celery flower
ports:
- 5555:5555
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
interval: 10s
timeout: 10s
retries: 5
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully
######################################################
# SPARK SERVICES
######################################################
jupyterlab:
image: andreper/jupyterlab:3.0.0-spark-3.0.0
container_name: jupyterlab
ports:
- 8888:8888
- 4040:4040
volumes:
- shared-workspace:/opt/workspace
spark-master:
image: andreper/spark-master:3.0.0
container_name: spark-master
ports:
- 8081:8080
- 7077:7077
volumes:
- shared-workspace:/opt/workspace
spark-worker-1:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-1
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8082:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
spark-worker-2:
image: andreper/spark-worker:3.0.0
container_name: spark-worker-2
environment:
- SPARK_WORKER_CORES=1
- SPARK_WORKER_MEMORY=512m
ports:
- 8083:8081
volumes:
- shared-workspace:/opt/workspace
depends_on:
- spark-master
volumes:
postgres-db-volume:
shared-workspace:
name: "jordi_airflow"
driver: local
driver_opts:
type: "none"
o: "bind"
device: "/Users/jordicrespoguzman/Projects/custom_airflow_spark/spark_folder"
I am trying to run the following DAG:
from airflow import DAG
from airflow.providers.http.sensors.http import HttpSensor
from airflow.sensors.filesystem import FileSensor
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
from airflow.operators.email import EmailOperator
from datetime import datetime, timedelta
import csv
import requests
import json
default_args = {
"owner": "airflow",
"email_on_failure": False,
"email_on_retry": False,
"email": "[email protected]",
"retries": 1,
"retry_delay": timedelta(minutes=5),
}
def printar():
print("success!")
with DAG(
"forex_data_pipeline",
start_date=datetime(2021, 1, 1),
schedule_interval="@daily",
default_args=default_args,
catchup=False,
) as dag:
downloading_rates = PythonOperator(task_id="test1", python_callable=printar)
forex_processing = SparkSubmitOperator(
task_id="spark1",
application="/opt/airflow/dags/test.py",
conn_id="spark_conn",
verbose=False,
)
downloading_rates >> forex_processing
But I see this error in the airflow ui:
Broken DAG: [/opt/airflow/dags/dag_spark.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/dag_spark.py", line 7, in <module>
from airflow.providers.apache.spark.operators.spark_submit import SparkSubmitOperator
ModuleNotFoundError: No module named 'airflow.providers.apache'
I have specified to install additional requirements in the docker-compose file:
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-apache-airflow-providers-apache-spark}
I am writing it wrong? how I should specify the additional requirements I want to install in airflow? can I pass a requirements.txt? if so, how I specify the path?
2
Answers
Support for
_PIP_ADDITIONAL_REQUIREMENTS
environment variable has not been released yet. It is only supported by the developer/unreleased version of the docker image. It is planned that this feature will be available in Airflow 2.1.1. For more information, see: Adding extra requirements for build and runtime of the PROD image.For the older version, you should build a new image and set this image in the
docker-compose.yaml
. To do this, you need to follow a few steps.Dockerfile
with the following content:docker-compose.yaml
file:For more information, see:
Official guide about running Airflow in docker-compose environment
In particular, I recommend this fragment which describes what to do as you need to install a new pip package.
I recommend you check out the guide about building Docker Image. This explains how to install even more complex dependencies.
I also recommend only using Docker-compose files from the official website and intended for a specific version. Docker-compose files from newer versions may not work with older versions of Airflow, because we are making many improvements to these files all the time to improve stability reliability, and user experience.
I used:
(note: add 1 space)
Then when airflow docker first run Python, It will auto pip install the requirements packages.
[Reference]
lxml==4.6.3 charset-normalizer==1.4.1
. Available in Airflow image 2.1.1 and above.