skip to Main Content

I’m working on some data (~200GB) using spark in azure databricks. I am able to read the dataset (from blob storage) and modify it in various ways. However, every time I try to store it, either through .saveAsTable() or .csv() etc. I get this error. I am also getting it while using .select() (and various other functions) as seen in the stacktrace.

Any suggestions as to what could be wrong here would be highly appreciated!

Note I’m not a spark, azure or python expert so the more explicit the better 🙂

EDIT
Example of code returning this error:

Example 1

df=spark.read.csv(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/")

df.write.mode("overwrite").saveAsTable("test")

Example 2

df.write.csv("test")

Example 3

df.select().where(df['installationId'] == '').count()

All of these are giving the same error

    Py4JJavaError                             Traceback (most recent call last)
<command-2838615377381489> in <cell line: 1>()
----> 1 numInstallationId = df_android.select().where((df_android['installationId'] == '') | (df_android['installationId'] == None)).count()

/databricks/spark/python/pyspark/sql/dataframe.py in count(self)
    807         2
    808         """
--> 809         return int(self._jdf.count())
    810 
    811     def collect(self) -> List[Row]:

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1319 
   1320         answer = self.gateway_client.send_command(command)
-> 1321         return_value = get_return_value(
   1322             answer, self.gateway_client, self.target_id, self.name)
   1323 

/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
    194     def deco(*a: Any, **kw: Any) -> Any:
    195         try:
--> 196             return f(*a, **kw)
    197         except Py4JJavaError as e:
    198             converted = convert_exception(e.java_exception)

/databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    324             value = OUTPUT_CONVERTER[type](answer[2:], gateway_client)
    325             if answer[1] == REFERENCE_TYPE:
--> 326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.n".
    328                     format(target_id, ".", name), value)

Py4JJavaError: An error occurred while calling o1449.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 234 in stage 18.0 failed 4 times, most recent failure: Lost task 234.3 in stage 18.0 (TID 1145) (10.139.64.18 executor 15): com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@**REMOVED BY ME**.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:544)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:513)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:637)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:359)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:354)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:870)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1690)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:873)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:728)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - null
Parser Configuration: CsvParserSettings:
    Auto configuration enabled=true
    Auto-closing enabled=true
    Autodetect column delimiter=false
    Autodetect quotes=false
    Column reordering enabled=true
    Delimiters for detection=null
    Empty value=
    Escape unquoted values=false
    Header extraction enabled=null
    Headers=null
    Ignore leading whitespaces=false
    Ignore leading whitespaces in quotes=false
    Ignore trailing whitespaces=false
    Ignore trailing whitespaces in quotes=false
    Input buffer size=1048576
    Input reading on separate thread=false
    Keep escape sequences=false
    Keep quotes=false
    Length of content displayed on error=1000
    Line separator detection enabled=false
    Maximum number of characters per column=-1
    Maximum number of columns=20480
    Normalize escaped line separators=true
    Null value=
    Number of records to read=all
    Processor=none
    Restricting data in exceptions=false
    RowProcessor error handler=null
    Selected fields=field selection: [0]
    Skip bits as whitespace=true
    Skip empty lines=true
    Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
    CsvFormat:
        Comment character=#
        Field delimiter=)
        Line separator (normalized)=n
        Line separator sequence=n
        Quote character="
        Quote escape character=
        Quote escape escape character=null
Internal state when error was thrown: line=0, column=20481, record=0, charIndex=5890569
    at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
    at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:707)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:288)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:446)
    at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:92)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:454)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:478)
    ... 33 more
Caused by: java.lang.ArrayIndexOutOfBoundsException

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3247)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3181)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3175)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3175)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1412)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1412)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1412)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3456)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3397)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3385)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51)
Caused by: com.databricks.sql.io.FileReadException: Error while reading file wasbs:REDACTED_LOCAL_PART@**REMOVED BY ME**
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:544)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:513)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:637)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$hasNext$1(FileScanRDD.scala:359)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:354)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.hashAgg_doAggregateWithoutKey_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:759)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:81)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:168)
    at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:136)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.scheduler.Task.run(Task.scala:96)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:870)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1690)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:873)
    at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
    at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:728)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: com.univocity.parsers.common.TextParsingException: java.lang.ArrayIndexOutOfBoundsException - null
Parser Configuration: CsvParserSettings:
    Auto configuration enabled=true
    Auto-closing enabled=true
    Autodetect column delimiter=false
    Autodetect quotes=false
    Column reordering enabled=true
    Delimiters for detection=null
    Empty value=
    Escape unquoted values=false
    Header extraction enabled=null
    Headers=null
    Ignore leading whitespaces=false
    Ignore leading whitespaces in quotes=false
    Ignore trailing whitespaces=false
    Ignore trailing whitespaces in quotes=false
    Input buffer size=1048576
    Input reading on separate thread=false
    Keep escape sequences=false
    Keep quotes=false
    Length of content displayed on error=1000
    Line separator detection enabled=false
    Maximum number of characters per column=-1
    Maximum number of columns=20480
    Normalize escaped line separators=true
    Null value=
    Number of records to read=all
    Processor=none
    Restricting data in exceptions=false
    RowProcessor error handler=null
    Selected fields=field selection: [0]
    Skip bits as whitespace=true
    Skip empty lines=true
    Unescaped quote handling=STOP_AT_DELIMITERFormat configuration:
    CsvFormat:
        Comment character=#
        Field delimiter=)
        Line separator (normalized)=n
        Line separator sequence=n
        Quote character="
        Quote escape character=
        Quote escape escape character=null
Internal state when error was thrown: line=0, column=20481, record=0, charIndex=5890569
    at com.univocity.parsers.common.AbstractParser.handleException(AbstractParser.java:402)
    at com.univocity.parsers.common.AbstractParser.parseLine(AbstractParser.java:707)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser.$anonfun$parse$2(UnivocityParser.scala:288)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$1(UnivocityParser.scala:446)
    at org.apache.spark.sql.catalyst.util.FailureSafeParser.parse(FailureSafeParser.scala:92)
    at org.apache.spark.sql.catalyst.csv.UnivocityParser$.$anonfun$parseIterator$2(UnivocityParser.scala:454)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492)
    at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:478)
    ... 33 more
Caused by: java.lang.ArrayIndexOutOfBoundsException

2

Answers


  1. Chosen as BEST ANSWER

    Ok, so it seems like the problem was related to reading the blob as a csv. The content of the blob storage was probably not correct csv format so reading as text solved the issue!

    df = spark.read.option("header", "true").text(file_location)
    

  2. I reproduced same thing in my environment. I got the same error, for overcoming this error. Follow below syntax and reference.

    Create a mount location to your storage account with data bricks. Mount creates a separate storage location and also you can perform both read and write operations:

    dbutils.fs.mount(
        source = "wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/",
        mount_point = "/mnt/<Folder_name>",
        extra_configs = {"fs.azure.account.key.<storage_account_name>.blob.core.windows.net":"Access key"})
    

    Ref1

    Access key

    Ref2

    #reading Data
    
    file_location = "/mnt/demo123"
    
    df = spark.read.format("csv").option("inferSchema", "true").option("header", "true").option("sep", ",").load(file_location)
     display(df)
    

    Ref3

    #Writing data 
    df.write.mode("overwrite").option("header", "true").csv("/mnt/demo123")
    

    Ref4

    Or

    Without mount also you perform Read and write operation

    For more information refer this link and MS document.

    Login or Signup to reply.
Please signup or login to give your own answer.
Back To Top
Search