skip to Main Content

I want to read the Parquet files I get from the readSteam() method from a Kafka topic. However, when I read these Parquet files, I get a dataframe that I don’t want instead of real values.

import glob

parquet_files = glob.glob("/home/ubuntu/tmp/output" + "/*.parquet")

parquet_df = spark.read.parquet(*parquet_files)

parquet_df.show()

Output:

+----+--------------------+-----------+---------+------+--------------------+-------------+
| key|               value|      topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-----------+---------+------+--------------------+-------------+
|null|[22 7B 27 74 69 6...|electricRaw|        0|   127|2023-06-03 22:05:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   124|2023-06-03 22:04:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   125|2023-06-03 22:04:...|            0|
|null|[22 7B 27 74 69 6...|electricRaw|        0|   126|2023-06-03 22:04:...|            0|
+----+--------------------+-----------+---------+------+--------------------+-------------+

If I want to extract the value’s column as below, I get it encrypted.

df = parquet_df.select("value")
df = df.select("value").withColumnRenamed("value", "new_value")
df.show()

Output:

+--------------------+
|           new_value|
+--------------------+
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
|[22 7B 27 74 69 6...|
+--------------------+

My goal is to get the real data in the value column. How can I do it?

Thanks in advance.

2

Answers


  1. To see the full content of a column in spark just set

    df.show(truncate=False)
    
    Login or Signup to reply.
  2. The column value is of type byte array, you need to decode it to ASCII format, try this:

    byte_array_to_ascii = udf(lambda x: bytearray(x).decode('utf-8'))
    df = df.withColumn("ascii_value", byte_array_to_ascii("value"))
    df.show()
    

    If it didn’t work, it’s probably because of format of the value column, then try this:

    hex_to_ascii = udf(lambda x: bytearray.fromhex(''.join(filter(str.isalnum, x))).decode('utf-8'))
    df = df.withColumn("value", functions.hex(col("value"))).withColumn("ascii_value", hex_to_ascii("value"))
    df.show()
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search