skip to Main Content

I’m currently attempting to build a data pipeline using the puckel docker-airflow 1.10.9 image on windows.

When the functions are called to the DAG from within the DAG file, it works well and I can access the output of the code.

However, when I try to import the function from the my_scripts directory at the same level as my_dags directory, I get the following error:

NameError: name 'reorder_columns' is not defined

Below is the folder structure that I am using for my airflow:

airflow
|
dags
├── dockerfiles
|    ├── Dockerfile 
|
├── docker-compose.yml
|
├── trading_pipeline
│   ├── my_dags
│   │   ├── demand_forecast_dag.py
│   │   ├── __init__.py
│   │   ├── DataStore.py
|   ├── my_scripts
│   |    ├── demand_forecast.py
│   |    ├── __init__.py
|   |
|   ├── __init__.py
|
├── __init__.py

Below is the demand_forecast_dag file:

try:

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    import requests
    import xml.etree.ElementTree as ET
    import pandas as pd
    import DataStore
    import os
    import datetime

    from dags.trading_pipeline.my_scripts.demand_forecast import reorder_columns

    print("All DAG modules are ok......")

except Exception as e:

    print("Error {} ".format(e))

col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"

with DAG(
    dag_id="demand_forecast_dag",
    schedule_interval='0 9 * * *',
    default_args={
                       "owner": "airflow",
                       "retries": 1,
                       "retry_delay": datetime.timedelta(minutes=5),
                       "start_date": datetime.datetime(2021, 1, 1),
                   },
        catchup=False) as f:

    get_demand_forecast = PythonOperator(
        task_id="reorder_columns",
        python_callable=reorder_columns,
        op_args=[col_name_1, col_name_2, col_name_3]
    )

Below is the demand_forecast.py file that the function is being imported into the DAG from:

import requests
import xml.etree.ElementTree as ET
import pandas as pd
import DataStore
import os
import datetime

settlement_datetime = datetime.date.today() + datetime.timedelta(days=1)
required_date = settlement_datetime.strftime("%Y-%m-%d")

col_name_1 = "Demand_Forecast"
col_name_2 = "Settlement_Period"
col_name_3 = "Date"


def clock_change_sp(settlement_date):
    """when there is a clock change, there is a different number of settlement periods, this function uses the
    DataStore file to determine the number of sp based on if the required date is a clock change
    :param settlement_date: uses the settlement date at the top of the script
    :return x + 1: the number of required settlement periods plus 1"""
    if settlement_date in DataStore.clocks_forward:
        x = 46
    elif settlement_date in DataStore.clocks_back:
        x = 50
    else:
        x = 48
    return x + 1


def get_demand_forecast(date):
    api_key = "....."
    """uses the BMReports API to pull in the demand forecast for each settlement period of the following day
    :param self: uses the settlement_date and api_key from the Data class
    :return: a list of the demand forecast for the following day"""
    initial_data_list = []
    session = requests.Session()
    for x in range(0, 1, 1):
        url = "https://api.bmreports.com/BMRS/FORDAYDEM/V1?APIKey=" + api_key + "&FromDate=" + date + "&ToDate=" + date + "&ServiceType=xml"
        response = session.get(url)
        request_data = response.text
        root = ET.fromstring(request_data)
        for item in root.findall('.//responseBody/responseList/item/demand'):
            initial_data_list.append(item.text)
    demand_forecast_list = initial_data_list[:clock_change_sp(required_date)-1:]
    return demand_forecast_list


def create_dataframes(col_1):
    """converts the list required list from above into a dataframe, converts it to GW and rounds to 2dp
    :param self: inherits the demand_forecast_list from the previous function, plus col_name_1, which is the name
    of the data column "Demand Forecast"
    :return df: a dataframe with the required data in GW and rounded to 2dp
    """
    df = pd.DataFrame(get_demand_forecast(required_date), columns=[col_1])
    df[col_1] = df[col_1].astype(str).astype(int)
    df[col_1] = df[col_1].div(1000).round(2)
    # print(df)
    return df


def add_dates_and_sp(col_2, col_3, settlement_date):
    """adds the selected date and settlement periods to the dataframe
    :param self: inherits the dataframe from the previous function, the number of settlement periods from the first function,
    plus col_name_2 and col_name_3 which are the SP and Date column names respectively
    :return df: a dataframe with all the collected data plus the Date and Settlement Period"""
    df = create_dataframes(col_name_1)
    df[col_2] = list(range(1, clock_change_sp(required_date)))
    df[col_3] = settlement_date
    # print(df)
    return df


def reorder_columns(col_1, col_2, col_3):
    """reorders the columns so that it reads more easily
    :param self: the dataframe from the previous function with all the data but in the wrong order
    :return df: a dataframe with the columns in the required order"""
    df = add_dates_and_sp(col_2, col_3, required_date)[[col_3, col_2, col_1]]
    print(df)
    return df

I am fairly new to airflow and docker so apologies if this is a simple error.

Let me know if you need any more information.

Thanks in advance,

Josh

2

Answers


  1. Chosen as BEST ANSWER

    After some troubleshooting and reading more Stack Overflow articles I managed to import the function from the script in another folder.

    There were two parts to it, the first involved setting my AIRFLOW_HOME and PYTHONPATH in my Dockerfile:

    ENV AIRFLOW_HOME=/usr/local/airflow
    ENV PYTHONPATH "${PYTHONPATH}:C:/Users/name/airflow/dags"
    

    I also added some volumes into my docker-compose.yml file:

    volumes:
    - ./trading_pipeline/my_dags:/usr/local/airflow/dags
    - ./trading_pipeline/my_scripts:/usr/local/airflow/dags/trading_pipeline
    

    Not sure if this is the best method but it is working.


  2. It’s nice to always have a separate file for functions to have a clean code but you don’t have to import a single function you can easily import like this by file name I think your import is wrong you can import like this **

    from demand_forecast import *

    ** Also import os and sys and file import so that it can read the file as of know it is not reading the files **

    import os

    import sys

    sys.path.insert(0, os.path.abspath(os.path.dirname(file)))**

    you can use python collab but you can also like do this method like

    var1 = reorder_columns(col_1, col_2, col_3)

    and then put the var 1 in your python collab

    try this and let me know if you face any problem

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search