skip to Main Content

I have 3 tables in Athena and I want to run ‘SELECT * FROM table’ for each of the 3 tables. Below is the basic python code which is executed for one table named as client:

import json
import boto3
import time

def lambda_handler(event, context):
    client = boto3.client('athena')
    
    #Setup and perform query
    queryStart = client.start_query_execution(
            QueryString = 'SELECT * FROM client limit 10;',
            QueryExecutionContext = {
                'Database': 'dev'
            },
            ResultConfiguration = {
                'OutputLocation': 's3://bucketpune/athena-output/lambda/'
            }
        )
        
    #Observe results
    queryId = queryStart['QueryExecutionId']
    time.sleep(15)
    
    results = client.get_query_results(QueryExecutionId = queryId)
    for row in results['ResultSet']['Rows']:
        print(row)

I want this to loop again with other tables one by one like ‘SELECT * FROM customers limit 10;’

I used for loop as shown below but it takes only the last table name ‘region’.

import json
import boto3
import time

table = ["client", "customer", "region"]
for x in table:
    def lambda_handler(event, context):
        client = boto3.client('athena')
        
        #Setup and perform query
        queryStart = client.start_query_execution(
                QueryString = 'SELECT * FROM ' + x + ' limit 10;',
                QueryExecutionContext = {
                    'Database': 'dev'
                },
                ResultConfiguration = {
                    'OutputLocation': 's3://bucketpune/athena-output/lambda/'
                }
            )
            
        #Observe results
        queryId = queryStart['QueryExecutionId']
        time.sleep(15)
        
        results = client.get_query_results(QueryExecutionId = queryId)
        for row in results['ResultSet']['Rows']:
            print(row)

2

Answers


  1. Your for x in table: must be inside your handler, not outside:

    import json
    import boto3
    import time
    
    table = ["client", "customer", "region"]
    client = boto3.client('athena')
    
    def lambda_handler(event, context):
        
        for x in table:
            #Setup and perform query
            queryStart = client.start_query_execution(
                    QueryString = 'SELECT * FROM ' + x + ' limit 10;',
                    QueryExecutionContext = {
                        'Database': 'dev'
                    },
                    ResultConfiguration = {
                        'OutputLocation': 's3://bucketpune/athena-output/lambda/'
                    }
                )
                
            #Observe results
            queryId = queryStart['QueryExecutionId']
            time.sleep(15)
            
            results = client.get_query_results(QueryExecutionId = queryId)
            for row in results['ResultSet']['Rows']:
                print(row)
    
    Login or Signup to reply.
  2. What @Marcin suggested is not bad, although since Athena is asynchronous in some sense, you can benefit from that (AS LONG AS YOU DON’T NEED ANSWER FROM PREVIOUS QUERY!). The technical term for this is that Athena works in the best effort manner. It can take a second or half a minute to return your rows, so remember that time.sleep(15) will not ensure that Athena finished processing. Here’s an example of a function in python that fetches data from Athena using also incremental wait (to not spam Athena each second and in the worst case scenario hit API qoutas):

    def get_query_results(query):
        query_start = client.start_query_execution(
            QueryString=query,
            QueryExecutionContext={
                'Database': "{your_database}"
            },
            ResultConfiguration={
                'OutputLocation': "{s3_output_location}"
            }
        )
    
        query_id = query_start['QueryExecutionId']
        max_iterations = 10
        for i in range(max_iterations):
            print(f"{i} try")
            time.sleep((i + 1) * 1)
            query_execution = client.get_query_execution(QueryExecutionId=query_id)
            status = query_execution['QueryExecution']['Status']['State']
            print(f"status = {status}")
            # 'QUEUED'|'RUNNING'|'SUCCEEDED'|'FAILED'|'CANCELLED'
            if status == 'QUEUED' or status == 'RUNNING':
                continue
            if status == 'FAILED' or status == 'CANCELLED':
                raise 'Query failed'
            else:
                results = client.get_query_results(QueryExecutionId=query_id)
                rows = []
                for row in results['ResultSet']['Rows']:
                    rows.append(row)
                return rows
    
        print("Could not retrieve in specified time")
        return None
    
    

    Now, you can use that as a base to create a function that will first schedule all three (or n) queries and check for the answers periodically.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search