skip to Main Content

There is a requirement in my project to encrypt some of PII columns data while writing data in a parquet file. To write the data in parquet file, Azure Synapse pyspark notebook is being used.

Not getting any references on the internet. Any leads would be very helpful.

2

Answers


  1. Chosen as BEST ANSWER

    Below code can help to encrypt and decrypt PII data using Fernet library. Generated Fernet key can be stored in secret and can be pulled at any point in time for decryption.

    # Generate key using Fernet library
    from cryptography.fernet import Fernet
    
    key = Fernet.generate_key()
    
    #Define Encrypt User Defined Function 
    
    def encrypt_val(clear_text,MASTER_KEY):
        from cryptography.fernet import Fernet
        f = Fernet(MASTER_KEY)
        clear_text_b=bytes(clear_text, 'utf-8')
        cipher_text = f.encrypt(clear_text_b)
        cipher_text = str(cipher_text.decode('ascii'))
        return cipher_text
    
    # Define decrypt user defined function 
    def decrypt_val(cipher_text,MASTER_KEY):
        from cryptography.fernet import Fernet
        f = Fernet(MASTER_KEY)
        clear_val=f.decrypt(cipher_text.encode()).decode()
        return clear_val 
    
    # Create dataframe
    from pyspark.sql import SparkSession
    
    columns = ["Name","Phone"]
    data = [("Tom", "8989767656"), ("Jhon", "9997878676"), ("Sam", "8990344323")]
    
    spark = SparkSession.builder.appName('Enc').getOrCreate()
    rdd = spark.sparkContext.parallelize(data)
    
    dfFromRDD1 = rdd.toDF()
    
    columns = ["Name","Phone"]
    df = rdd.toDF(columns)  
    
    # Apply encryption  
    from pyspark.sql.functions import udf, lit, md5
    from pyspark.sql.types import StringType
    # Register UDF's
    encrypt = udf(encrypt_val, StringType())
    decrypt = udf(decrypt_val, StringType())
    # Fetch key from secrets
    # encryptionKey = dbutils.preview.secret.get(scope = "encrypt", key = 
    "fernetkey")
    encryptionKey = key
    # Encrypt the data 
    #df = spark.table("Test_Encryption")
    encrypted = df.withColumn("Phone", encrypt("Phone",lit(encryptionKey)))
    encrypted.show()
    
    # Apply decryption
    decrypted = encrypted.withColumn("Phone", 
    decrypt("Phone",lit(encryptionKey)))
    decrypted.show()
    

  2. We are using the below implementation. You can try anyone based on your scope

    from pyspark.sql import functions as F
    from pyspark.sql import DataFrame
    
    class DataFrameMasker:
        def __init__(self, df: DataFrame):
            self.df = df
        
        def mask_columns(self, columns_to_mask: dict):
            masked_df = self.df
            
            for column_name, masking_strategy in columns_to_mask.items():
                if masking_strategy == "hash":
                    masked_df = self._mask_hash(masked_df, column_name)
                elif masking_strategy == "random":
                    masked_df = self._mask_random(masked_df, column_name)
                elif masking_strategy=="column_masker":
                    masked_df = self.mask_rows(masked_df, column_name)
                # Add more masking strategies here if needed
            
            return masked_df
        
        def _mask_hash(self, df: DataFrame, column_name: str) -> DataFrame:
            masked_df = df.withColumn(column_name, F.sha2(F.col(column_name), 256))
            return masked_df
        
        def _mask_random(self, df: DataFrame, column_name: str) -> DataFrame:
            masked_df = df.withColumn(column_name, F.expr("substring(md5(cast(rand() as string)), 0, 10)"))
            return masked_df
    
        def mask_rows(self, df: DataFrame, columns_name: str) -> DataFrame:
            masked_df = df.withColumn(
                    columns_name,
                    expr(f"CASE WHEN {columns_name} IS NOT NULL THEN 'xxxxxxx' ELSE NULL END")
                )
            
            return masked_df
    
    # Create a sample DataFrame
    data = [("Subash", "K", "[email protected]"),
            ("Test", "Message", "[email protected]")]
    
    columns = ["first_name", "last_name", "email"]
    df = spark.createDataFrame(data, columns)
    
    # Initialize the DataFrameMasker with the DataFrame
    masker = DataFrameMasker(df)
    
    # Define columns to mask and their masking strategy
    columns_to_mask = {
        "first_name": "hash",
        "last_name": "random",
        "email": "column_masker"
    }
    
    # Mask specified columns
    masked_df = masker.mask_columns(columns_to_mask)
    
    # Show the masked DataFrame
    masked_df.show()
    

    You can check other masking methods as hashing is a one-way process

    enter image description here

    and also check Dynamic Data masking in Synapse
    enter image description here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search