I have a question
I want to use Templates reference – {{ds}}
When substituting in PostgresOperator, everything works out well (I guess so)
And PostgresHook does not want to work
def prc_mymys_update(procedure: str, type_agg: str):
with PostgresHook(postgres_conn_id=CONNECTION_ID_GP).get_conn() as conn:
with conn.cursor() as cur:
with open(URL_YML_2,"r", encoding="utf-8") as f:
ya_2 = yaml.safe_load(f)
yml_mymts_2 = ya_2['type_agg']
query_pg = ""
if yml_mymts_2[0]['type_agg_name'] == "day" and type_agg == "day":
sql_1 = yml_mymts_2[0]['sql']
query_pg = f"""{sql_1}"""
elif yml_mymts_2[1]['type_agg_name'] == "retention" and type_agg == "retention":
sql_2 = yml_mymts_2[1]['sql']
query_pg = f"""{sql_2}"""
elif yml_mymts_2[2]['type_agg_name'] == "mau" and type_agg == "mau":
sql_3 = yml_mymts_2[2]['sql']
query_pg = f"""{sql_3}"""
cur.execute(query_pg)
dates_list = cur.fetchall()
for date_res in dates_list:
cur.execute(
"select from {}(%(date)s::date);".format(procedure),
{"date": date_res[0].strftime("%Y-%m-%d")},
)
conn.close()
I use yml
type_agg:
- type_agg_name: day
sql: select calendar_date from entertainment_dds.v_calendar where calendar_date between '{{ds}}'::date - interval '7 days' and '{{ds}}'::date - 1 order by 1 desc
- type_agg_name: retention
sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
- type_agg_name: mau
sql: select dt::date date_ from generate_series('{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '7 days', '{{execution_date.strftime('%Y-%m-%d')}}'::date - interval '1 days', interval '1 days') dt order by 1 asc
And when I run a dag, it comes to a moment with a certain task that uses
- type_agg_name: retention
sql: SELECT t.date::date AS date FROM generate_series((date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) - interval '11 month'), date_trunc('month','{{execution_date.strftime('%Y-%m-%d')}}'::date) , '1 month'::interval) t(date) order by 1 asc
I have wrong
psycopg2.errors.UndefinedColumn: column "y" does not exist
LINE 1: …((date_trunc(‘month’,'{{execution_date.strftime(‘%Y-%m-%d’)}…
I tried to find information on the interaction of Templates reference and PostgresHook, but found nothing
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html#templates-reference
2
Answers
Note the single quotes in the following query:
Specifically, this part:
You have two separate strings here, separated by the date format. Here’s the first string:
This causes the date format to be rendered separately. If you wrap the date format in double quotes instead of single quotes, it should resolve this error. For example:
Note that you might need to swap the double and single quotes if double quotes in the RDBMS are used for other purposes, for example:
This is expected.
templated_fields
is an attribute of theBaseOperator
in Airflow from which all operators inherit. This is which passing in a Jinja expression when using thePostgresOperator
works just fine.If you need to write a custom task, you need to render the template values explicitly. Like this, untested, but I’m sure this can be extrapolated in your function:
The
ti
kwargs represents the Airflow Task Instance and is directly accessible as part of the execution context pushed to every task in Airflow. That object has arender_templates()
method which will translate the Jinja expression to a value.If the
PostgresOperator
doesn’t fit your needs you can always subclass the operator and tailor it accordingly.Also, the
sql
string itself has single quotes which cause string parsing issues as you’re seeing:'{{execution_date.strftime('%Y-%m-%d')}}'
Should be something like:
'{{execution_date.strftime("%Y-%m-%d")}}'