we’ve set up AWS SecretsManager as a secrets backend to Airflow (AWS MWAA) as described in their documentation. Unfortunately, nowhere is explained where the secrets are to be found and how they are to be used then. When I supply conn_id
to a task in a DAG, we can see two errors in the task logs, ValueError: Invalid IPv6 URL
and airflow.exceptions.AirflowNotFoundException: The conn_id redshift_conn isn't defined
. What’s even more surprising is that when retrieving variables stored the same way with Variable.get('my_variable_id')
, it works just fine.
The question is: Am I wrongly expecting that the conn_id can be directly passed to operators as SomeOperator(conn_id='conn-id-in-secretsmanager')
? Must I retrieve the connection manually each time I want to use it? I don’t want to run something like read_from_aws_sm_fn
in the code below every time beforehand…
Btw, neither the connection nor the variable show up in the Airflow UI.
Having stored a secret named airflow/connections/redshift_conn
(and on the side one airflow/variables/my_variable_id
), I expect the connection to be found and used when constructing RedshiftSQLOperator(task_id='mytask', redshift_conn_id='redshift_conn', sql='SELECT 1')
. But this results in the above error.
I am able to retrieve the redshift connection manually in a DAG with a separate task, but I think that is not how SecretsManager is supposed to be used in this case.
The example DAG is below:
from airflow import DAG, settings, secrets
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.models.baseoperator import chain
from airflow.models import Connection, Variable
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator
from datetime import timedelta
sm_secret_id_name = f'airflow/connections/redshift_conn'
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
'retries': 1,
}
def read_from_aws_sm_fn(**kwargs): # from AWS example code
### set up Secrets Manager
hook = AwsBaseHook(client_type='secretsmanager')
client = hook.get_client_type('secretsmanager')
response = client.get_secret_value(SecretId=sm_secret_id_name)
myConnSecretString = response["SecretString"]
print(myConnSecretString[:15])
return myConnSecretString
def get_variable(**kwargs):
my_var_value = Variable.get('my_test_variable')
print('variable:')
print(my_var_value)
return my_var_value
with DAG(
dag_id=f'redshift_test_dag',
default_args=default_args,
dagrun_timeout=timedelta(minutes=10),
start_date=days_ago(1),
schedule_interval=None,
tags=['example']
) as dag:
read_from_aws_sm_task = PythonOperator(
task_id="read_from_aws_sm",
python_callable=read_from_aws_sm_fn,
provide_context=True
) # works fine
query_redshift = RedshiftSQLOperator(
task_id='query_redshift',
redshift_conn_id='redshift_conn',
sql='SELECT 1;'
) # results in above errors :-(
try_to_get_variable_value = PythonOperator(
task_id='get_variable',
python_callable=get_variable,
provide_context=True
) # works fine!
3
Answers
First step is defining the prefixes for connections and variables, if they are not defined, your secret backend will not check for the secret:
Then for the secrets/connections, you should store them in those prefixes, respecting the required fields for the connection.
For example, for the connection
my_postgress_conn
:You should store it in the path
airflow/connections/my_postgress_conn
, with the json dict as string.And for the variables, you just need to store them in
airflow/variables/<var_name>
.Using secret manager as a backend, you don’t need to change the way you use the connections or variables. They work the same way, when looking up a connection/variable, airflow follow a search path.
The connection/variable will not up in the UI.
The 1st error is related to the secret and the 2nd error is due to the connection not existing in the airflow UI.
There is 2 formats to store connections in secret manager (depending on the aws provider version installed) the IPv6 URL error could be that its not parsing the connection correctly. Here is a link to the provider docs.
If you are using MWAA and you have integrated Secrets Manager using the response in the first post, then you do not have to do anything special to access Connections (or Variables).
I wrote a blog post on this here and whilst it does cover more than just the AWS Secrets integration, I dive into that too.
Lets say you want to add a new connection to Redshift, and reference this within your DAGs as "redshift_default". You would create a secret like this
if you needed to add additional parameters in EXTRA you would add these by using ?{param}={value} like so:
You do not need to do anything else in your DAG. The integration with AWS Secrets Manager, and then the creation of secrets like this will appear to your code as if they were in the UI Connection. Remember though, they will NOT appear in the UI.