skip to Main Content

I have a log in below format(json/ion) where the value of time_of_initial_request is in timestamp format but without double quotes.

{post_handler_type:"trace",
default_marketplace:"prod_iad",
request_id:"5MTB3656X0JRL38R6LJY",
rest_uri:"productApi/feature",
client_logging_id:"PredictionMetadata",
time_of_initial_request:2023-05-24T15:00:00.577Z
}

Since the field(key) name does not have quotes, I am using the allowUnquotedFieldNames to read it.

val JsonRaw = spark.read.option("allowUnquotedFieldNames","true")
              .option("inferSchema", "true")
              .json("s3://my-bucket/prediction-test/2023-05-24/")

val df = JsonRaw.registerTempTable("prediction")
val df2 = spark.sql("select request_id  from prediction")
df2.show()

But it is throwing error as corrupt records.

To do my code check , I loaded a sample file having double quotes for time_of_initial_request:"2023-05-24T15:00:00.577Z" and my code is working fine and providing the results.

Actually, I don’t even need the field "time_of_initial_request" in my use case so I am good to ignore/drop it.

Since I will be fetching the log using S3 manifest , I cannot change the log format or correct it.

Any guidance to handle this using scala ?

2

Answers


  1. I don’t think there is something that sparks can do for you by just adding a flag or some configuration. The problem is that you have an unquoted value which is an string. That is not a valid definition of JSON. Also the value has a : which is the same symbol that is used to split the key from the value.

    I think you would need to do some manual parsing. Maybe read the file as just a text file, split line by line and apply some logic to parse based on some custom deserialization. You could use a json library (there are tons of them such as jsoniter, circe, play-json, spray-json, upickle, etc) or you use some regex to extract keys and values.

    val spark = SparkSession.builder
      .master("local")
      .appName("Simple Application")
      .getOrCreate()
    
    import spark.implicits._
    
    val jsonDataFrame = spark
      .read
      .text(jsonFile)
      .as[String]
      .map(jsonFromS3 => ???) // here you have to apply the custom logic 
    

    If you can guarantee that the file will always have the same structure, I mean, a json with those keys and values, it could be easier. In the other case you would need to do some extra work.

    Login or Signup to reply.
  2. You can use regex in order to fix you text file and convert it to a valid json before reading the spark dataFrame.

    you can check this answer here

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search