skip to Main Content

I have the following scala code in Azure Databricks that creates a dataframe with the list of files and file properties for an specific folder.

Can someone help me to transform that code to recursively navigate to each sub folder and add the list of files to the dataframe?

Here is my code:

`%scala

def GetFiles (path: String): DataFrame = {
  try {
  spark.createDataFrame(
      dbutils.fs.ls(path).map { info =>
        (info.path, info.name, info.size, info.modificationTime)
      }
    ).toDF("path", "name", "size", "modificationTime").orderBy('name)
  }
    catch {
     case e: Exception => spark.emptyDataFrame
  }
}`

2

Answers


  1. dbutils.fs.ls will not give the files list recursively. It will only give the files in that particular folder.

    As a workaround, you can try the below approach to get your requirement done.

    Using Files.walkFileTree(), first I got the list of all files recursively like below.

    enter image description here

    Then I extracted the list of all last distinct subfolder paths from it.

    enter image description here

    Now, using union and dbutils.fs.ls(folder path) in loop of subfolders list, I have generated the result dataframe with required columns.

    enter image description here

    Complete code;

    
    import java.nio.file.attribute.BasicFileAttributes
    import java.nio.file._
    import scala.collection.mutable.MutableList
    import scala.collection.mutable.ArrayBuffer
    
    // intialozing mutable list to store all file paths
    val files_list= MutableList[String]()
    
    //give the file path with dbfs
    val root = Paths.get("/dbfs/mnt/target1/")
    
    //reads all files in the target1 mount folder recursively and storing in a list
    Files.walkFileTree(root, new SimpleFileVisitor[Path] {
      override def visitFile(file: Path, attrs: BasicFileAttributes) = {
        files_list += file.toAbsolutePath.toString.slice(5,file.toAbsolutePath.toString.length)
        FileVisitResult.CONTINUE
      }
    })
    
    println(files_list)
    
    // getting the path of every last folder
    val subfolders_list= MutableList[String]()
    for(x <- files_list){
      subfolders_list+=x.split("/").slice(0,x.split("/").length-1).mkString("/")
    }
    
    //distinct folder paths
    println(subfolders_list.distinct)
    
    // storing files in a dataframe of first folder from list 
    var final_df=spark.createDataFrame(
          dbutils.fs.ls(subfolders_list.distinct(0)).map { info =>
            (info.path, info.name, info.size, info.modificationTime)
          }
        ).toDF("path", "name", "size", "modificationTime")
    
    // union files from all folders with the above dataframe
    for(x<-subfolders_list.distinct.slice(1,subfolders_list.distinct.length)){
      final_df=final_df.union(spark.createDataFrame(
          dbutils.fs.ls(x).map { info =>
            (info.path, info.name, info.size, info.modificationTime)
          }
        ).toDF("path", "name", "size", "modificationTime"))
    }
    
    //final dataframe with the required columns
    display(final_df)
    
    Login or Signup to reply.
  2. You can accomplish this with a simple recursive function as shown below. P.S. I have written this to work with Azure’s mssparkutils.fs.ls but the code is generic. Replace XXX with whatever works for you or whatever type dbutils.fs.ls returns.

    /**
       * List all files and folders in specified path and sub-folders recursively. The out-of-the-box
       * [[dbutils.fs.ls]] isn't recursive.
       *
       * @param root initial directory to start the listing from
       * @return an [[Seq]] of [[XXX]] objects
       */
      def deepLs(root: String): Seq[XXX] = {
        val these = dbutils.fs.ls(root)
        if (these != null) {
          these ++ these.filter(_.isDir).flatMap(deepLs)
        }
        else Seq()
      }
    
    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search