I’m trying to set Airflow as a scheduler for my already existing project. After opening the airflow I get error message:
Broken DAG: [/opt/airflow/dags/test_dag.py] Traceback (most recent call last):
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/opt/airflow/dags/test_dag.py", line 4, in <module>
from src.subfolder.sql import MyClass
ModuleNotFoundError: No module named 'src'
This is my folder structure:
project
+-- airflow
+-- dags
+-- my_dag.py
+-- logs
+-- plugins
docker-compose.yaml
Dockerfile
+-- src
+-- subfolder1
+-- __init__.py
+-- sql.py
+-- something.py
+-- subfolder2
+-- __init__.py
+-- something_more.py
+-- random_things.py
my_dag.py calls:
from src.subfolder1.sql import MyClass
from src.subfolder2.random_things import my_method
My Dockerfile contains :
FROM apache/airflow:2.2.3
WORKDIR /opt/airflow
ADD ./requirements.txt /opt/airflow/requirements.txt
RUN python -m pip install --upgrade pip wheel
USER airflow
RUN pip install -r requirements.txt
I would prefer to keep this folder structure. I think the error could be solved by turning the python files into a package but I’m lost here.
2
Answers
The reason why Airflow cannot import those files is because the path of the "src" folder is not set.
Airflow adds by default the the "dags" folder in the PATH (or PYTHONPATH).
Since you don’t want to change the folder structure I suggest you try adding the location of the "src" folder to the path.
One additional advice is to not use this folder structure, reason being that if you want to expand this application and add more DAGs and have multiple "src" folders then you are running into a problem that Airflow will not know from which folder to pick up the functionality.
I have a very similar structure as yours. I haven’t figured out a ‘clean’ method for packaging my modules yet so I’m using a workaround at the moment.
1: You can add the relative path to your ‘src’ folder (‘project’ folder in your case) to the system PATH at runtime. You can use any python library you use for os related stuff, I prefer pathlib:
from pathlib import Path
import sys
for your folder structure
sys.path.append(str(Path('../../').resolve()))
2: Same Idea as above basically but just in case Airflow runs your dag from a temporary folder, the sys.path variable luckily still contains the path to your original file, so append that instead of using Path() (or
os.getcwd()
).import sys
for your folder structure
module_path = '/'.join(sys.path[0].split('/')[:-2])
sys.path.append(module_path))