skip to Main Content

I am working on a PySpark script to perform a simple word count. My script runs fine, but I encounter an error when trying to save the results using saveAsTextFile (Now I’m on ubuntu). Here’s the error I get:

py4j.protocol.Py4JJavaError: An error occurred while calling o48.saveAsTextFile. 
org.apache.hadoop.mapred.FileAlreadyExistsException: Output directory file:/home/pyspark_python/wordcount/output_new already exists

Here are the steps I have taken so far:

Verified that the output directory does not contain any data (ls shows it is empty).
Deleted and recreated the directory using rm -r and mkdir -p.
Ensured no other Spark jobs are running (ps aux | grep spark).

Despite this, the error persists when I re-run the script.

Here is the code I am using :

from pyspark import SparkConf, SparkContext
import os

def main(input_file, output_dir):
    # Configuration Spark
    conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
    sc = SparkContext(conf=conf)

    # Lecture du fichier d'entrée
    text_file = sc.textFile(input_file)

    # Comptage des mots
    counts = (
        text_file.flatMap(lambda line: line.split(" "))
                 .map(lambda word: (word, 1))
                 .reduceByKey(lambda a, b: a + b)
    )

    # Sauvegarde des résultats
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    counts.saveAsTextFile(output_dir)

    print(f"Résultats sauvegardés dans le répertoire : {output_dir}")

if __name__ == "__main__":
    # Définir les chemins d'entrée et de sortie
    input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
    output_dir = "/home/othniel/pyspark_python/wordcount/output_new"

    # Exécution de la tâche WordCount
    main(input_file, output_dir)

How can I resolve this error and ensure PySpark successfully writes to the output directory ? Is there something specific I need to configure in my script or environment ?

Thank you for your help!

2

Answers


  1. As the error message states Output directory file:/home/pyspark_python/wordcount/output_new already exists. Spark expects only the parent dir (os.path.basedir(output_dir)) to exist. Not the actual output dir.

    Don’t create it!

    from pyspark import SparkConf, SparkContext
    import os
    
    def main(input_file, output_dir):
        # Configuration Spark
        conf = SparkConf().setAppName("WordCountTask").setMaster("local[*]")
        sc = SparkContext(conf=conf)
    
        # Lecture du fichier d'entrée
        text_file = sc.textFile(input_file)
    
        # Comptage des mots
        counts = (
            text_file.flatMap(lambda line: line.split(" "))
                     .map(lambda word: (word, 1))
                     .reduceByKey(lambda a, b: a + b)
        )
    
        # ---------- DON'T DO THIS ---------------
        # if not os.path.exists(output_dir):              <--------
        #     os.makedirs(output_dir)                     <--------
        # ---------- DON'T DO THIS ---------------
    
        counts.saveAsTextFile(output_dir)
    
        print(f"Résultats sauvegardés dans le répertoire : {output_dir}")
    
    if __name__ == "__main__":
        # Définir les chemins d'entrée et de sortie
        input_file = r"/home/othniel/pyspark_python/wordcount/input/loremipsum.txt"
        output_dir = "/home/othniel/pyspark_python/wordcount/output_new"
    
        # Exécution de la tâche WordCount
        main(input_file, output_dir)
    

    PS: You should not be using SparkContext and RDD interface. Use SparkSession/DataFrame instead. E.g.

    from pyspark.sql import SparkSession, DataFrame
    
    def main(input_file, output_dir):
        spark: SparkSession = SparkSession.builder.getOrCreate()
        df_text: DataFrame = spark.read.text('input_file')
        df_counts: DataFrame = (
            df_text.
            # ... your transformations using DataFrame API instead of RDD)
        )
    
        df_counts.write.csv(output_dir, overwrite=True)
    
    Login or Signup to reply.
  2. you’re attempting to write your text file to the same path location as an existing directory.

    a directory is a special type of file on unix systems, and trying to overwrite a directory (aka special type of file) with a new file (a standard file like a text file) will usually fail.

    the saveAsTextFile method expects a filepath, not a directory path. so you need to append a file name onto the directory path string where you want to write the file.

    counts.saveAsTextFile(os.path.join(output_dir, "counts.txt"))
    

    your text file should then be saved here:

    /home/othniel/pyspark_python/wordcount/output_new/counts.txt
    

    docs for saveAsTextFile with some similar examples https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.RDD.saveAsTextFile.html

    see here for a (very) detailed answer on how “everything is a file” on linux: https://askubuntu.com/a/1073803

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search