skip to Main Content

we’ve set up AWS SecretsManager as a secrets backend to Airflow (AWS MWAA) as described in their documentation. Unfortunately, nowhere is explained where the secrets are to be found and how they are to be used then. When I supply conn_id to a task in a DAG, we can see two errors in the task logs, ValueError: Invalid IPv6 URL and airflow.exceptions.AirflowNotFoundException: The conn_id redshift_conn isn't defined. What’s even more surprising is that when retrieving variables stored the same way with Variable.get('my_variable_id'), it works just fine.

The question is: Am I wrongly expecting that the conn_id can be directly passed to operators as SomeOperator(conn_id='conn-id-in-secretsmanager')? Must I retrieve the connection manually each time I want to use it? I don’t want to run something like read_from_aws_sm_fn in the code below every time beforehand…

Btw, neither the connection nor the variable show up in the Airflow UI.

Having stored a secret named airflow/connections/redshift_conn (and on the side one airflow/variables/my_variable_id), I expect the connection to be found and used when constructing RedshiftSQLOperator(task_id='mytask', redshift_conn_id='redshift_conn', sql='SELECT 1'). But this results in the above error.
I am able to retrieve the redshift connection manually in a DAG with a separate task, but I think that is not how SecretsManager is supposed to be used in this case.

The example DAG is below:

from airflow import DAG, settings, secrets
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.providers.amazon.aws.hooks.base_aws import AwsBaseHook
from airflow.models.baseoperator import chain
from airflow.models import Connection, Variable
from airflow.providers.amazon.aws.operators.redshift import RedshiftSQLOperator

from datetime import timedelta

sm_secret_id_name = f'airflow/connections/redshift_conn'

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': days_ago(1),
    'retries': 1,
}

def read_from_aws_sm_fn(**kwargs):  # from AWS example code
    ### set up Secrets Manager
    hook = AwsBaseHook(client_type='secretsmanager')
    client = hook.get_client_type('secretsmanager')
    response = client.get_secret_value(SecretId=sm_secret_id_name)
    myConnSecretString = response["SecretString"]

    print(myConnSecretString[:15])

    return myConnSecretString

def get_variable(**kwargs):
    my_var_value = Variable.get('my_test_variable')
    print('variable:')
    print(my_var_value)
    return my_var_value

with DAG(
        dag_id=f'redshift_test_dag',
        default_args=default_args,
        dagrun_timeout=timedelta(minutes=10),
        start_date=days_ago(1),
        schedule_interval=None,
        tags=['example']
) as dag:
    read_from_aws_sm_task = PythonOperator(
        task_id="read_from_aws_sm",
        python_callable=read_from_aws_sm_fn,
        provide_context=True
    )  # works fine

    query_redshift = RedshiftSQLOperator(
        task_id='query_redshift',
        redshift_conn_id='redshift_conn',
        sql='SELECT 1;'
    )  # results in above errors :-(

    try_to_get_variable_value = PythonOperator(
        task_id='get_variable',
        python_callable=get_variable,
        provide_context=True
    )  # works fine!

3

Answers


  1. First step is defining the prefixes for connections and variables, if they are not defined, your secret backend will not check for the secret:

    secrets.backend_kwargs : {"connections_prefix" : "airflow/connections", "variables_prefix" : "airflow/variables"}
    

    Then for the secrets/connections, you should store them in those prefixes, respecting the required fields for the connection.

    For example, for the connection my_postgress_conn:

    {
        "conn_type": "postgresql",
        "login": "user",
        "password": "pass",
        "host": "host",
        "extra": '{"key": "val"}',
    }
    

    You should store it in the path airflow/connections/my_postgress_conn, with the json dict as string.

    And for the variables, you just need to store them in airflow/variables/<var_name>.

    Login or Signup to reply.
  2. The question is: Am I wrongly expecting that the conn_id can be directly passed to operators as SomeOperator(conn_id=’conn-id-in-secretsmanager’)? Must I retrieve the connection manually each time I want to use it? I don’t want to run something like read_from_aws_sm_fn in the code below every time beforehand…

    Using secret manager as a backend, you don’t need to change the way you use the connections or variables. They work the same way, when looking up a connection/variable, airflow follow a search path.

    Btw, neither the connection nor the variable show up in the Airflow UI.

    The connection/variable will not up in the UI.

    ValueError: Invalid IPv6 URL and airflow.exceptions.AirflowNotFoundException: The conn_id redshift_conn isn’t defined

    The 1st error is related to the secret and the 2nd error is due to the connection not existing in the airflow UI.

    There is 2 formats to store connections in secret manager (depending on the aws provider version installed) the IPv6 URL error could be that its not parsing the connection correctly. Here is a link to the provider docs.

    Login or Signup to reply.
  3. If you are using MWAA and you have integrated Secrets Manager using the response in the first post, then you do not have to do anything special to access Connections (or Variables).

    I wrote a blog post on this here and whilst it does cover more than just the AWS Secrets integration, I dive into that too.

    Lets say you want to add a new connection to Redshift, and reference this within your DAGs as "redshift_default". You would create a secret like this

    aws secretsmanager create-secret --name airflow/connections/redshift_default --description "Connect to Amazon Redshift Cluster BuildON" --secret-string "Postgres://awsuser:XXXXX@airflow-summit.cq7hpqttbcoc.eu-west-1.redshift.amazonaws.com:5439/mwaa" --region={your region}"
    

    if you needed to add additional parameters in EXTRA you would add these by using ?{param}={value} like so:

    aws secretsmanager create-secret --name airflow/connections/redshift_default --description "Connect to Amazon Redshift Cluster BuildON" --secret-string "Postgres://awsuser:XXXXX@airflow-summit.cq7hpqttbcoc.eu-west-1.redshift.amazonaws.com:5439/mwaa?param1=value1&param2=value2" --region={your region}"
    

    You do not need to do anything else in your DAG. The integration with AWS Secrets Manager, and then the creation of secrets like this will appear to your code as if they were in the UI Connection. Remember though, they will NOT appear in the UI.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search