skip to Main Content

I have this lambda function that I got from here: https://github.com/endre-synnes/python_aws_course/blob/main/lambda_intro/04_secrets_and_databases_and_stuff/17_rotate_secrets.py

import boto3

def lambda_handler(event, context):
    arn = event['SecretId']
    token = event['ClientRequestToken']
    step = event['Step']

    # Setup the client
    service_client = boto3.client('secretsmanager')

    # Make sure the version is staged correctly
    metadata = service_client.describe_secret(SecretId=arn)
    if not metadata['RotationEnabled']:
        raise ValueError("Secret %s is not enabled for rotation" % arn)
    
    if step == "createSecret":
        create_secret(service_client, arn, token)

    elif step == "setSecret":
        set_secret(service_client, arn, token)

    elif step == "testSecret":
        test_secret(service_client, arn, token)

    elif step == "finishSecret":
        finish_secret(service_client, arn, token)

    else:
        raise ValueError("Invalid step parameter")


def create_secret(service_client, arn, token):

    service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")

    # Now try to get the secret version, if that fails, put a new secret
    try:
        service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
    except service_client.exceptions.ResourceNotFoundException:
        # Generate a random password
        passwd = service_client.get_random_password(ExcludeCharacters='/@"'\')
        # Put the secret
        service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=passwd['RandomPassword'], VersionStages=['AWSPENDING'])


def set_secret(service_client, arn, token):
    print("No database user credentials to update...")


def test_secret(service_client, arn, token):
    print("No need to testing against any service...")


def finish_secret(service_client, arn, token):
   
    # First describe the secret to get the current version
    metadata = service_client.describe_secret(SecretId=arn)

    for version in metadata["VersionIdsToStages"]:
        if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
            if version == token:
                # The correct version is already marked as current, return
                return

            # Finalize by staging the secret version current
            service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=version)
            break

   

Instead of wiping out the entire ‘Secret String’ with a random password, how would I just replace one Key / Value called "Password"?

My entire Secret String is the following:

{"Password":"*********","Username":"********"}

Thanks in advance for any help! (Sorry I am a newb at coding!)

I tried a few ‘chatgpt’ suggestions, but none seemed to work.

import string
import random
import boto3

def lambda_handler(event, context):
    # Set the name of the secret
    secret_name = event['SecretId']
    
    # Connect to AWS Secrets Manager
    session = boto3.session.Session()
    client = session.client(service_name='secretsmanager')

    # Retrieve the current secret value
    get_secret_value_response = client.get_secret_value(SecretId=secret_name)
    current_secret_value = get_secret_value_response['SecretString']
    
    # Define the complexity of the new secret value
    password_length = 16
    uppercase_letters = string.ascii_uppercase
    lowercase_letters = string.ascii_lowercase
    digits = string.digits
    special_chars = "!@#$%^&*()_+-="
    
    # Parse the secret value to extract the part that needs to be rotated
    old_value = current_secret_value.split('=')[1]
    
    # Generate a new secret value with the defined complexity
    new_value_characters = []
    new_value_characters.extend(random.choices(uppercase_letters, k=4))
    new_value_characters.extend(random.choices(lowercase_letters, k=4))
    new_value_characters.extend(random.choices(digits, k=4))
    new_value_characters.extend(random.choices(special_chars, k=4))
    new_value_characters.extend(random.choices(uppercase_letters + lowercase_letters + digits + special_chars, k=password_length-16))
    new_value = ''.join(random.sample(new_value_characters, k=password_length))
    
    # Replace the rotated part of the secret value with the new value
    new_secret_value = current_secret_value.replace(old_value, new_value)
    
    # Update the secret value for the specified secret key in AWS Secrets Manager
    put_secret_value_response = client.put_secret_value(SecretId=secret_name, SecretString=new_secret_value)
    
    # Return the new secret value
    return {"new_secret_value": new_secret_value}

2

Answers


  1. Chosen as BEST ANSWER

    Posting the entire lambda function, thanks for the help @benmanson!!!

    import json
    import boto3
    
    def lambda_handler(event, context):
        arn = event['SecretId']
        token = event['ClientRequestToken']
        step = event['Step']
    
        # Setup the client
        service_client = boto3.client('secretsmanager')
    
        # Make sure the version is staged correctly
        metadata = service_client.describe_secret(SecretId=arn)
        if not metadata['RotationEnabled']:
            raise ValueError("Secret %s is not enabled for rotation" % arn)
        
        if step == "createSecret":
            create_secret(service_client, arn, token)
    
        elif step == "setSecret":
            set_secret(service_client, arn, token)
    
        elif step == "testSecret":
            test_secret(service_client, arn, token)
    
        elif step == "finishSecret":
            finish_secret(service_client, arn, token)
    
        else:
            raise ValueError("Invalid step parameter")
    
    
    def create_secret(service_client, arn, token):
        response = service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")
        
        # Convert from string to dictionary
        secret_value = json.loads(response["SecretString"])
    
        # Now try to get the secret version, if that fails, put a new secret
        try:
            service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
        except service_client.exceptions.ResourceNotFoundException:
            # Generate a random password
            passwd = service_client.get_random_password(ExcludeCharacters='/@"'\')
            # Update Password key
            secret_value["Password"] = passwd["RandomPassword"]
            # Convert back to a string
            secret_value = json.dumps(secret_value)
            # Put the secret
            service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=secret_value, VersionStages=['AWSPENDING'])
    
    
    def set_secret(service_client, arn, token):
        print("No database user credentials to update...")
    
    
    def test_secret(service_client, arn, token):
        print("No need to testing against any service...")
    
    
    def finish_secret(service_client, arn, token):
       
        # First describe the secret to get the current version
        metadata = service_client.describe_secret(SecretId=arn)
    
        for version in metadata["VersionIdsToStages"]:
            if "AWSCURRENT" in metadata["VersionIdsToStages"][version]:
                if version == token:
                    # The correct version is already marked as current, return
                    return
    
                # Finalize by staging the secret version current
                service_client.update_secret_version_stage(SecretId=arn, VersionStage="AWSCURRENT", MoveToVersionId=token, RemoveFromVersionId=version)
                break
    

  2. You can parse the current SecretString value using the json module into a Python dictionary. Then, when you generate a new password, you replace the intended key in the dictionary. Finally, you can reencode your object as a JSON string, and pass this back into put_secret_value. It would be something like this:

    import json
    
    ...
    
    def create_secret(service_client, arn, token):
        response = service_client.get_secret_value(SecretId=arn, VersionStage="AWSCURRENT")
        
        # Convert from string to dictionary
        secret_value = json.loads(response["SecretString"])
    
        # Now try to get the secret version, if that fails, put a new secret
        try:
            service_client.get_secret_value(SecretId=arn, VersionId=token, VersionStage="AWSPENDING")
        except service_client.exceptions.ResourceNotFoundException:
            # Generate a random password
            passwd = service_client.get_random_password(ExcludeCharacters='/@"'\')
            # Update Password key
            secret_value["Password"] = passwd["RandomPassword"]
            # Convert back to a string
            secret_value = json.dumps(secret_value)
            # Put the secret
            service_client.put_secret_value(SecretId=arn, ClientRequestToken=token, SecretString=secret_value, VersionStages=['AWSPENDING'])
    
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search