skip to Main Content

The task I would like to solve:
I have a constant influx of files in a specific folder held on azure storage. I would like to periodically list the files in this folder in order to copy them to a different location. I am keeping track of what I have copied already, so I am removing already copied files from the list of files I have in the original folder.

The problem with this:
Just listing the filepaths using InMemoryFileIndex is very slow. I am using "y" in my code, which is a Set of String containing my desired filepaths to determine what I have and then extract already copied filepaths from it later:

val x = InMemoryFileIndex.bulkListLeafFiles(... my parameters ...)
val y = x.flatMap(_._2.map(.path))

Question: is there a smarter way to go about listing all my files?
Can I make some kind of table for example that represents all files that are inside the folder and also takes into account the incoming ones as well, so I don’t have to call InMemoryFileIndex to iterate through everything again and again?

2

Answers


  1. Chosen as BEST ANSWER

    My solution I am trying right now is:

    val streamingQuery = spark.readStream
    .format("binaryFile")
    .schema("`path` STRING, `modificationTime` TIMESTAMP, `length` BIGINT, `content` BINARY")
    .option("recursiveFileLookup", "true")
    .load("my path here")
    .filter(col("modificationTime") > "2023-10-30 07:00:00")
    .writeStream
    .trigger(Trigger.Once)
    .foreachBatch (my code goes here on how I copy files)
    .option("checkpointLocation", "my path here")
    .start()
    .awaitTermination()
    
    

    Now I have tested this on a folder with a couple files and it is working. My next question would be:
    Does the checkpoint only registers data with col("modificationTime") > "2023-10-30 07:00:00", or all files it has read in the folder?


  2. Directly there is no faster way, adls is simply very slow for file listing (although it has improved in the last years), Hadoops listing also uses threads which suck up the available connections on an account quickly. For this reason it’s worth using Delta over parquet (irrespective of using Databricks or not for the runtime).

    Look at using events and react as they arrive rather than in a timed fashion by starting new jobs, or have a job running to look at arrival events and store them in a log to process in a later batch. If you are using Databricks look at autoloader for another possible approach.

    I’d suggest that if the file arrival / sending is also in your control that you also have the sender send an event when processed.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search