skip to Main Content

i had a table named xyz in mysql Aurora DB, which has Table_Name, Table_ID,QuerySeq,sql_query,view_name.

Now i need to read the table in pyspark, run the sql queries(using spark.sql) present in sql_query according to QuerySeq and create a view for the sql_query from the view_name coulmn.
Then it needs to persist the view.

If view is not present it just needs to run the query using spark.sql statement

tried looping through seq number and run the queries

2

Answers


  1. Chosen as BEST ANSWER
    from pyspark.sql import SparkSession
    
    def dynamic_sql():
        # Implement your dynamic SQL logic here
        # This function should return the dynamic SQL query as a string
        # For example:
        return "SELECT * FROM some_table WHERE some_condition"
    
    def process_table(table_type, tab_id):
        # Initialize SparkSession
        spark = SparkSession.builder 
            .appName("Table Processing") 
            .getOrCreate()
    
        # Assuming you have the necessary connection parameters
        # Define your database connection settings
        jdbc_url = "jdbc:mysql://your_database_host:your_port/your_database_name"
        connection_properties = {
            "user": "your_username",
            "password": "your_password",
            "driver": "com.mysql.jdbc.Driver"
        }
    
        # Check if table_type is 'DIM'
        if table_type == 'DIM':
            # Read the data from the table
            df = spark.read.jdbc(url=jdbc_url, table="rpt_dim", properties=connection_properties)
    
            # Filter data for the specified tab_id
            filtered_df = df.filter(df["table_id"] == tab_id)
    
            # Iterate through the filtered DataFrame
            for row in filtered_df.collect():
                query_sequence = row["Query sequence"]
                sql_query = row["sql_query"]
                view_name = row["View"]
    
                if view_name:
                    # If the View column is not empty, create a view
                    spark.sql(sql_query).createOrReplaceTempView(view_name)
                    spark.sql(f"SELECT * FROM {view_name}").persist()
                else:
                    # If the View column is empty, run the SQL query and insert into the table
                    spark.sql(f"INSERT OVERWRITE TABLE {row['table_name']} {sql_query}")
    
        # Check if table_type is 'FACT'
        elif table_type == 'FACT':
            # Call the dynamic_sql() function to get the SQL query
            sql_query = dynamic_sql()
    
            # Execute the dynamic SQL query
            spark.sql(sql_query)
    
        # Stop the SparkSession
        spark.stop()
    
    # Example usage:
    # process_table('DIM', 1)  # Specify your table_type and tab_id for DIM
    # process_table('FACT', 2)  # Specify your table_type for FACT
    

  2. df_cfe = spark.sql("""
    SELECT 
        COALESCE(e.FNDG_TYPE_CD, 'ALL') AS FNDG_TYPE_CD,
        COALESCE(e.PROD_TYPE_CD, 'ALL') AS PROD_TYPE_CD,
        COALESCE(e.MBR_RLTNSHP_CD, 'ALL') AS MBR_RLTNSHP_CD, 
        COALESCE(e.RGSTRTN_STTS, 'ALL') AS RGSTRTN_STTS,
        COALESCE(e.GEN_NM, 'ALL') AS GEN_NM, 
        COALESCE(e.UNDRWRTG_ST_CD, 'ALL') AS UNDRWRTG_ST_CD,
        COALESCE(e.AGE_GRP, 'ALL') AS AGE_GRP,
        COALESCE(e.cntct_clsfctn_type_cd_val_Txt, 'ALL') AS cntct_clsfctn_type_cd_val_Txt,
        COALESCE(e.LAST_UPDT_CHNL_CD_VAL_TXT, 'ALL') AS LAST_UPDT_CHNL_CD_VAL_TXT,
        COALESCE(e.cntct_stts_cd_val_txt, 'ALL') AS cntct_stts_cd_val_txt,
        COALESCE(e.CNTCT_USBLTY, 'ALL') AS CNTCT_USBLTY,
        COALESCE(e.CNTCT_RNK, 'ALL') AS CNTCT_RNK,
        COUNT(DISTINCT e.MBR_UNIQ_CNT) AS MBR_UNIQ_CNT,
        SUM(e.INDV_CNT) AS INDV_CNT,
        SUM(e.SM_GRP_CNT) AS SM_GRP_CNT,
        SUM(e.LRG_GRP_CNT) AS LRG_GRP_CNT,
        SUM(e.NATL_CNT) AS NATL_CNT 
    FROM
    (
        SELECT 
            m.FNDG_TYPE_CD AS FNDG_TYPE_CD, 
            m.PROD_TYPE_CD AS PROD_TYPE_CD,
            m.MBR_RLTNSHP_CD AS MBR_RLTNSHP_CD,
            (CASE WHEN r.mcid IS NOT NULL THEN 'Y' ELSE 'N' END) AS RGSTRTN_STTS,
            m.GEN_NM AS GEN_NM,
            m.UNDRWRTG_ST_CD AS UNDRWRTG_ST_CD,
            (CASE WHEN m.Minor_ind = 'Y' THEN 'Minor' ELSE 'Adult' END) AS AGE_GRP,
            (CASE WHEN epo.cntct_clsfctn_type_cd_val_Txt IN ('Email', 'Phone') THEN epo.cntct_clsfctn_type_cd_val_Txt END) AS cntct_clsfctn_type_cd_val_Txt,
            epo.LAST_UPDT_CHNL_CD_VAL_TXT AS LAST_UPDT_CHNL_CD_VAL_TXT,
            epo.cntct_stts_cd_val_txt AS cntct_stts_cd_val_txt,
            (CASE WHEN epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' AND (epo.cntct_stts_cd_Val_txt IN ('ACTIVE', 'BLANK', 'UNK') OR epo.cntct_stts_cd_Val_txt IS NULL)
                THEN 'USABLE' ELSE 'NOT USABLE' END) AS CNTCT_USBLTY,
            (CASE WHEN epo.PRFRD_IND = 'Y' THEN 'PRIMARY' ELSE 'SECONDARY' END) AS CNTCT_RNK,
            (CASE WHEN epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' THEN m.mcid END) AS MBR_UNIQ_CNT,
            (CASE WHEN INDV_IND = 'Y' AND epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' THEN m.mcid END) AS INDV_CNT,
            (CASE WHEN SM_GRP_IND = 'Y' AND epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' THEN m.mcid END) AS SM_GRP_CNT,
            (CASE WHEN LRG_GRP_IND = 'Y' AND epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' THEN m.mcid END) AS LRG_GRP_CNT,
            (CASE WHEN NATL_IND = 'Y' AND epo.cntct_clsfctn_type_cd_val_txt = 'EMAIL' THEN m.mcid END) AS NATL_CNT
        FROM DIM_CMRCL_MBR m
        INNER JOIN
        (
            SELECT * FROM ehub_prfctr_ods_cntct WHERE cntct_clsfctn_type_cd_val_Txt = 'EMAIL' AND (end_dt > current_timestamp OR end_dt IS NULL)
        ) epo ON epo.mcid = m.mcid
        LEFT OUTER JOIN
        (
            SELECT DISTINCT mcid FROM smarthelp_reg1
        ) r ON r.mcid = epo.mcid
    ) e
    GROUP BY e.FNDG_TYPE_CD, e.PROD_TYPE_CD, e.MBR_RLTNSHP_CD, e.RGSTRTN_STTS, e.GEN_NM, e.UNDRWRTG_ST_CD, e.cntct_clsfctn_type_cd_val_Txt, e.LAST_UPDT_CHNL_CD_VAL_TXT,
        e.cntct_stts_cd_val_txt, e.CNTCT_USBLTY, e.CNTCT_RNK
    WITH CUBE
    """)
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search