skip to Main Content

I am trying to load multiple documents into the MongoDb collection using databricks pyspark and while loading I am using updateDate filed as well , but after loading I could see updateDate field data type is string instead date data type.

here i am using the code for timestamp.

import datetime

current_timestamp_utc = datetime.datetime.now(datetime.timezone.utc)
formatted_timestamp = current_timestamp_utc.strftime("%Y-%m-%dT%H:%M:%S")
timezone_offset = current_timestamp_utc.strftime("%z")
formatted_timestamp = formatted_timestamp + ".000" + timezone_offset[:-2] + ":" + 
timezone_offset[-2:]

print(formatted_timestamp)

result : 2024-04-03T07:33:52.000+00:00

as per the result looks fine , but after loading into MongoDb , it is displaying as String instead Date.

So could you please help me on this how to load the documents with date data type.
I have used UpdateMany() method to change string to date data type and is this the write approach to proceed or is there any I/O or performance impact while using updatemany() method . Please suggest

2

Answers


  1. You can get the current time directly using spark SQL date-time functions as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    spark.sql("""select date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx") as updateDate""").show(truncate=False)
    
    Output:
    +-----------------------------+
    |updateDate                   |
    +-----------------------------+
    |2024-04-04T09:04:35.865+00:00|
    +-----------------------------+
    
    Schema:
    root
     |-- updateDate: string (nullable = false)
    

    If you notice the schema, updateDate is a string and you can convert it into a timestamp using to_timestamp() as follows:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp, to_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    spark.sql("""select to_timestamp(date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx")) as updateDate""").show(truncate=False)
    
    Output:
    +-----------------------+
    |updateDate             |
    +-----------------------+
    |2024-04-04 09:04:12.703|
    +-----------------------+
    
    Schema:
    root
     |-- updateDate: timestamp (nullable = true)
    

    And now the updateDate is a timestamp adjusted for your spark session’s timezone (that’s why the +00:00 offset is gone now) – which, by the way, you can update using spark.conf.set("spark.sql.session.timeZone", "<enter-timezone-here>").

    And if you want to add it to an existing dataframe as a new column you can do something like this:

    from pyspark.sql import SparkSession
    from pyspark.sql.functions import date_format, current_timestamp
    
    spark = SparkSession.builder.getOrCreate()
    
    df = spark.createDataFrame([(1,), (2,)], ["rownum"]) # replace this with your dataframe
    
    df = df.withColumn("updateDate", date_format(current_timestamp(), "yyyy-MM-dd'T'HH:MM:ss.SSSxxx").cast("timestamp"))
    
    df.show(truncate=False)
    df.printSchema()
    
    Output:
    +------+-----------------------+
    |rownum|updateDate             |
    +------+-----------------------+
    |1     |2024-04-04 09:04:48.473|
    |2     |2024-04-04 09:04:48.473|
    +------+-----------------------+
    
    Schema:
    root
     |-- rownum: long (nullable = true)
     |-- updateDate: timestamp (nullable = true)
    

    The datatype you are looking for is a timestamp with a timezone in Spark. Now, you can try loading the dataset into MongoDB with this schema.

    Login or Signup to reply.
  2. I have tried the below approach in Pyspark:

    DataFrame.printSchema of DataFrame[id: int, date: string]

    df1 = Dilip_df.withColumn("date", to_date("date", "yyyy/MM/dd HH:mm:ss"))
    

    Results:

    +---+----------+
    | id|      date|
    +---+----------+
    |  1|2022-01-01|
    |  2|2022-01-02|
    |  3|2022-01-03|
    +---+----------+
    
    root
     |-- id: integer (nullable = true)
     |-- date: date (nullable = true)
    

    In the code above, I used the withColumn() function to create a new column called "date" and use the to_date() function to convert the "date" column to the DateType.
    The format string "yyyy/MM/dd HH:mm:ss" specifies the format of the input date string.

    However, As you mentiond you are loading files to MangoDB
    As per the $toDate (aggregation)

    $toDate Converts a value to a date. If the value cannot be converted to a date,
    $toDate errors. If the value is null or missing, $toDate returns null.

    You can also try the below in the MangoDB in 4.2 version:

    from datetime import datetime
    mydate = datetime.strptime(‘08/12/1977 09:45:34 AM’, ‘%d/%m/%Y %I:%M:%S %p’)
    mydate = mydate.isoformat()
    

    Reference: String Datatype to Date datatype conversion
    $convert (aggregation)

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search