skip to Main Content

I have a MWAA environment and I have to create another one by Terraform. The environment creation is not an issue, but the ‘metadata’ of my old enviroment. I want to import all variables and connections, programatically, but I haven’t figured out so far.

I tried to change a few things in this approach, using POST request to MWAA CLI, but I only get a timeout. And I also tried this one.

Has anyone ever done this? I’d like to import variables and connections to my MWAA environment everytime I create it.

2

Answers


  1. Chosen as BEST ANSWER

    It turns out that I was getting a timeout and couldn't find out what was happening. To solve this situations I'll save my variables and connections in Secrets Managers and retrieve them at runtime with the DAG.


  2. I understand that you want to copy the old MWAA variables into the new instance, you can refer to the following doc which contains all of the supported commands by AWS CLI https://docs.aws.amazon.com/mwaa/latest/userguide/airflow-cli-command-reference.html.

    The variables & connections set & get commands are available.

    Unfortunately, not all commands are supported through the AWS CLI. Please note that you need to authenticate to AWS MWAA CLI, obtain the token, and then you will be able to execute the supported commands. Please refer to the following doc for more information https://docs.aws.amazon.com/mwaa/latest/userguide/call-mwaa-apis-cli.html

    Below are code snippets on how to obtain the CLI token and execute a command.

    import boto3
    import requests
    
    AWS_MWAA_HOSTNAME_URL = ""
    
    client = boto3.client('mwaa')
    
    
    def generate_cli_token(mwaa_env_name):
        return client.create_cli_token(
            Name=mwaa_env_name
        )
    
    
    def execute_mwaa_command(token, domain, command):
        res = requests.post(
            AWS_MWAA_HOSTNAME_URL.format(domain=domain),
            headers={
                'Authorization': f'Bearer {token}',
                'Content-Type': 'text/plain'
            },
            data=command
        )
        if res.status_code == 200:
            return res.json()
        else:
            print(res.text)
            res.raise_for_status()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search