cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

com.databricks.sql.io.FileReadException: Error while reading file dbfs:

smanickam
New Contributor II

I ran the below statement and got the error

%python

data = sqlContext.read.parquet("/FileStore/tables/ganesh.parquet")

display(data)

Error:

SparkException: Job aborted due to stage failure: Task 0 in stage 27.0 failed 1 times, most recent failure: Lost task 0.0 in stage 27.0 (TID 36, localhost, executor driver): com.databricks.sql.io.FileReadException: Error while reading file dbfs:/FileStore/tables/ganesh.parquet. at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:331) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:310) at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:463) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anonfun$prepareNextFile$1.apply(FileScanRDD.scala:451) at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable$$anonfun$run$1.apply$mcV$sp(SparkThreadLocalForwardingThreadPoolExecutor.scala:100) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable$$anonfun$run$1.apply(SparkThreadLocalForwardingThreadPoolExecutor.scala:100) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable$$anonfun$run$1.apply(SparkThreadLocalForwardingThreadPoolExecutor.scala:100) at org.apache.spark.util.threads.SparkThreadLocalCapturingHelper$class.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:68) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.runWithCaptured(SparkThreadLocalForwardingThreadPoolExecutor.scala:97) at org.apache.spark.util.threads.SparkThreadLocalCapturingRunnable.run(SparkThreadLocalForwardingThreadPoolExecutor.scala:100) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.RuntimeException: unexpected number of values encountered in Parquet batch, expected = 2 returned = 1 at com.databricks.sql.io.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:368) at com.databricks.sql.io.parquet.DatabricksVectorizedParquetRecordReader.nextBatch(DatabricksVectorizedParquetRecordReader.java:346) at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:159) at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40) at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:283) ... 14 more

5 REPLIES 5

shyam_9
Valued Contributor
Valued Contributor

Hi @smanickam,

How big is the file and whats the cluster configuration, this is working in my case,

0693f000007OroKAAS

smanickam
New Contributor II

@shyamspr

Thanks for your response. The file has only 1 row of data and I'm using the community edition of databricks. Also please see attached the parquet file that was generated. Please note that I used Parquet.Net (.Net Library for generating parquet File) Nuget Package to generate Parquet File. Also I uploaded the parquet file manually to DBFS. Is there any way I can upload the parquet file so you can validate and let me know where the issue is?

var itemIDColumn = new Parquet.Data.DataColumn(new DataField<string>("ItemID"), new string[] { consumersRatingInfo.ItemID });

             var retailerIDColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("RetailerID"), new string[] { consumersRatingInfo.RetailerID });


             //var ratingDateTimeColumn = new Parquet.Data.DataColumn(new DateTimeDataField&lt;DateTime&gt;("RatingDateTime"), new DateTime[] { consumersRatingInfo.RatingDateTime });
             //var ratingDateTimeColumn = new Parquet.Data.DataColumn(new DateTimeDataField("dateDateAndTime", DateTimeFormat.DateAndTime), new DateTimeOffset[] { consumersRatingInfo.RatingDateTime });
             var ratingValueColumn = new Parquet.Data.DataColumn(new DataField&lt;int&gt;("RatingValue"), new int[] { consumersRatingInfo.RatingValue });
             var itemUPCColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("ItemUPC"), new string[] { consumersRatingInfo.ItemUPC });
             var greceiptIDColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("GReceiptID"), new string[] { consumersRatingInfo.GReceiptID });
             var itemNameColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("ItemName"), new string[] { consumersRatingInfo.ItemName });
             var ratingLabelColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("RatingLabel"), new string[] { consumersRatingInfo.RatingLabel });
             var retailerTypeColumn = new Parquet.Data.DataColumn(new DataField&lt;string&gt;("RetailerType"), new string[] { consumersRatingInfo.RetailerType });


             //var ratingColumn = new Parquet.Data.DataColumn(new DataField&lt;List&lt;ConsumersRatingAttributesInfo&gt;&gt;("Rating"), new List&lt;ConsumersRatingAttributesInfo&gt;[] { consumersRatingAttributesInfos });
             // create file schema
             var schema = new Schema(idColumn.Field, cityColumn.Field, itemIDColumn.Field, retailerIDColumn.Field,  ratingValueColumn.Field, itemUPCColumn.Field, greceiptIDColumn.Field, itemNameColumn.Field, ratingLabelColumn.Field, retailerTypeColumn.Field);


             using (Stream fileStream = System.IO.File.OpenWrite("c:\\Work\\ganesh.parquet"))
             {
                 using (var parquetWriter = new ParquetWriter(schema, fileStream))
                 {
                     // create a new row group in the file
                     using (ParquetRowGroupWriter groupWriter = parquetWriter.CreateRowGroup())
                     {
                         groupWriter.WriteColumn(idColumn);
                         groupWriter.WriteColumn(cityColumn);
                         groupWriter.WriteColumn(itemIDColumn);
                         groupWriter.WriteColumn(retailerIDColumn);
                        // groupWriter.WriteColumn(ratingDateTimeColumn);
                         groupWriter.WriteColumn(ratingValueColumn);
                         groupWriter.WriteColumn(itemUPCColumn);
                         groupWriter.WriteColumn(greceiptIDColumn);
                         groupWriter.WriteColumn(itemNameColumn);
                         groupWriter.WriteColumn(ratingLabelColumn);
                         groupWriter.WriteColumn(retailerTypeColumn);
                     }
                 }
             }

smanickam
New Contributor II

@shyamspr - Please respond

MatthewSzafir
New Contributor III

I'm having a similar issue reading a JSON file. It is ~550MB compressed and is on a single line:

val cfilename = "c_datafeed_20200128.json.gz" 
val events = spark.read.json(s"/mnt/c/input1/$cfilename") 
display(events)

The filename is correct and the file is all a single line of JSON starting with [{" and ending with "}]. It is over 4GB decompressed. I'm using a Standard_DS4_v2 (28.0 GB Memory, 8 Cores, 1.5 DBU)

20/02/19 20:32:34 ERROR FileScanRDD: Error while reading file dbfs:/mnt/c/input1/c_datafeed_20200128.json.gz.
20/02/19 20:32:34 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 16)
com.databricks.sql.io.FileReadException: Error while reading file 
Error while reading file dbfs:/mnt/c/input1/c_datafeed_20200128.json.gz.
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1$anon$2.logFileNameAndThrow(FileScanRDD.scala:331)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1$anon$2.getNext(FileScanRDD.scala:310)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1.nextIterator(FileScanRDD.scala:397)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1.hasNext(FileScanRDD.scala:250)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$13$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.doRunTask(Task.scala:140)
    at org.apache.spark.scheduler.Task.run(Task.scala:113)
    at org.apache.spark.executor.Executor$TaskRunner$anonfun$13.apply(Executor.scala:533)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1541)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:539)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.OutOfMemoryError: Requested array size exceeds VM limit
    at java.util.Arrays.copyOf(Arrays.java:3236)
    at org.apache.hadoop.io.Text.setCapacity(Text.java:266)
    at org.apache.hadoop.io.Text.append(Text.java:236)
    at org.apache.hadoop.util.LineReader.readDefaultLine(LineReader.java:243)
    at org.apache.hadoop.util.LineReader.readLine(LineReader.java:174)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.skipUtfByteOrderMark(LineRecordReader.java:144)
    at org.apache.hadoop.mapreduce.lib.input.LineRecordReader.nextKeyValue(LineRecordReader.java:184)
    at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:40)
    at org.apache.spark.sql.execution.datasources.HadoopFileLinesReader.hasNext(HadoopFileLinesReader.scala:69)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
    at scala.collection.Iterator$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1$anon$2.getNext(FileScanRDD.scala:283)
    at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1.nextIterator(FileScanRDD.scala:397)
    at org.apache.spark.sql.execution.datasources.FileScanRDD$anon$1.hasNext(FileScanRDD.scala:250)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$anonfun$13$anon$1.hasNext(WholeStageCodegenExec.scala:640)
    at scala.collection.Iterator$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.isEmpty(Iterator.scala:331)
    at scala.collection.AbstractIterator.isEmpty(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceLeftOption(TraversableOnce.scala:203)
    at scala.collection.AbstractIterator.reduceLeftOption(Iterator.scala:1334)
    at scala.collection.TraversableOnce$class.reduceOption(TraversableOnce.scala:210)
    at scala.collection.AbstractIterator.reduceOption(Iterator.scala:1334)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$anonfun$1.apply(JsonInferSchema.scala:70)
    at org.apache.spark.sql.catalyst.json.JsonInferSchema$anonfun$1.apply(JsonInferSchema.scala:50)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:830)
    at org.apache.spark.rdd.RDD$anonfun$mapPartitions$1$anonfun$apply$23.apply(RDD.scala:830)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:353)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:317)

I resolved this issue by increasing my cluster and worker size. I also added .option("multiline", "true") to the spark.read.json command. This seemed counter intuitive as the JSON was all on one line but it worked.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.