cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

I am running simple count and I am getting an error

RaghuMundru
New Contributor III

Here is the error that I am getting when I run the following query

statement=sqlContext.sql("SELECT count(*) FROM ARDATA_2015_09_01").show()

---------------------------------------------------------------------------Py4JJavaError Traceback (most recent call last) <ipython-input-17-9282619903a0> in <module>()----> 1statement=sqlContext.sql("SELECT count(*) FROM ARDATA_2015_09_01").show()/home/ubuntu/databricks/spark/python/pyspark/sql/dataframe.py in show(self, n, truncate) 254+---+-----+ 255 """ --> 256print(self.jdf.showString(n, truncate)) 257 258def repr(self):/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in call_(self, *args) 536 answer = self.gateway_client.send_command(command) 537 return_value = get_return_value(answer, self.gateway_client, --> 538 self.target_id, self.name) 539 540for temp_arg in temp_args:/home/ubuntu/databricks/spark/python/pyspark/sql/utils.py in deco(a, *kw) 34def deco(a,*kw): 35try:---> 36return f(a,*kw) 37except py4j.protocol.Py4JJavaError as e: 38 s = e.java_exception.toString()/home/ubuntu/databricks/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 298 raise Py4JJavaError( 299'An error occurred while calling {0}{1}{2}.\n'.--> 300 format(target_id, '.', name), value) 301else: 302 raise Py4JError(

Py4JJavaError: An error occurred while calling o358.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 4 times, most recent failure: Lost task 0.3 in stage 4.0 (TID 7, 10.61.238.61): ExecutorLostFailure (executor 0 lost) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1283) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1271) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1270) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1270) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:697) at scala.Option.foreach(Option.scala:236) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:697) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1496) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1458) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1447) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:567) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1825) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1838) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1851) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:215) at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1385) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1903) at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1384) at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1314) at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1377) at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:497) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379) at py4j.Gateway.invoke(Gateway.java:259) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:207) at java.lang.Thread.run(Thread.java:745)

1 ACCEPTED SOLUTION

Accepted Solutions

miklos
Contributor

Could you send a screenshot of what you see in the Spark UI?

You should see this text: "Failed Jobs (1)"

Click on the link in the "Description" field twice to see the # of times this executor has run.

I only see a count() and take(1) being called on the dataset, which does not perform any validations against a schema you provided. Count() just counts the # of records and take(1) just returns a row.

This is the error:

org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (554 > 246)
    at org.apache.spark.sql.execution.datasources.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:261)
    at org.apache.spark.sql.execution.datasources.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:257)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
    at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:123)
    at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:42)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.writeInternal(ParquetRelation.scala:99)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
    ... 8 more

I just added this code to your notebook to show you that the dataset does not have the same number of elements:

count = Data.map(lambda x: len(x)).distinct().collect()
print count 

(1) Spark Jobs

  • Job 5View

    (Stages: 2/2)

[1, 554, 555, 560, 309, 246, 89, 221] Regarding error handling, this is up to you on how to determine if you have bad records that you want to recover later on, or maybe its a parsing error within the code above. This requires more understanding of your use case, but look over the information provided to understand where this could be happening in your code.

View solution in original post

15 REPLIES 15

arsalan1
Contributor

@Raghu Mundru Is this on Databricks or in a separate deployment?

RaghuMundru
New Contributor III

Yes its on Databricks and also I am unable to convert a file in to parquet which gives me new error

<svg/onload=alert(1)

thevivek
New Contributor II

0693f000007OroZAAS

fdgfdgfdgdfgfdgfdgfd

RaghuMundru
New Contributor III

Here is the error I am getting when converting in to parquet

Error

RaghuMundru
New Contributor III

Hello,

I really Appreciate if someone could help me in this regard.

miklos
Contributor

Hi Raghuram,

I checked the shard and noticed a few things. The executors have died and restarted on the cluster, and one of them continues to die likely due to out of memory errors. I'd have to dig deeper into why this occurred and it varies depending on the workload being run on the cluster. If you click on the Spark UI -> Executors tab, you can see the executor ID has crashed multiple times as the executor IDs increase by increments of 1.

In the UI, I also see that multiple SQLContexts are being created which isn't necessary. The sqlContext and sc (SparkContext) are already created on the clusters. If you would like to assign a new variable to them use the following code example:

val sqlContext = SQLContext.getOrCreate(sc)

You can access the sqlContext by using "sqlContext" variable, and the SparkContext using the "sc" variable across the notebooks.

I'd recommend restarting this cluster to get things back in a good state.

Let me know if the issue persists after a restart.

RaghuMundru
New Contributor III

Thanks Miklos for looking in to the issue.

I restarted the cluster twice but I am running in to the same error

Error

miklos
Contributor

Looking at the executor logs and failed tasks on your cluster, the issue is with how you're attempting to write the parquet files out. The failed tasks writes a partial file out, and re-running the failed tasks causes the IOException that the file already exists.

You can see the error by going to the Spark UI -> Failed Tasks -> View Details to see the first executor task that failed.

The job has a very long schema defined and it isn't matching the input data, which is causing the failure. You would have to clean up the data, or add error handling while converting to a DataFrame before writing out to Parquet.

RaghuMundru
New Contributor III

Thanks for the reply @Miklos Christine​ 

I am unable to see the error in the failed task from the sparkUI and I am preety sure that the schema is perfectly matching the data as I am able to some other stuff correctly querying the data.

Could you please let me know on how to add error handling while converting to a dataframe before writing out to parquet or guide me to the correct resource to find it.

Thanks.

miklos
Contributor

Could you send a screenshot of what you see in the Spark UI?

You should see this text: "Failed Jobs (1)"

Click on the link in the "Description" field twice to see the # of times this executor has run.

I only see a count() and take(1) being called on the dataset, which does not perform any validations against a schema you provided. Count() just counts the # of records and take(1) just returns a row.

This is the error:

org.apache.spark.SparkException: Task failed while writing rows.
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:250)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$anonfun$run$1$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
    at org.apache.spark.scheduler.Task.run(Task.scala:88)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.IndexOutOfBoundsException: Trying to write more fields than contained in row (554 > 246)
    at org.apache.spark.sql.execution.datasources.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:261)
    at org.apache.spark.sql.execution.datasources.parquet.MutableRowWriteSupport.write(ParquetTableSupport.scala:257)
    at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:121)
    at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:123)
    at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:42)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.writeInternal(ParquetRelation.scala:99)
    at org.apache.spark.sql.execution.datasources.DefaultWriterContainer.writeRows(WriterContainer.scala:242)
    ... 8 more

I just added this code to your notebook to show you that the dataset does not have the same number of elements:

count = Data.map(lambda x: len(x)).distinct().collect()
print count 

(1) Spark Jobs

  • Job 5View

    (Stages: 2/2)

[1, 554, 555, 560, 309, 246, 89, 221] Regarding error handling, this is up to you on how to determine if you have bad records that you want to recover later on, or maybe its a parsing error within the code above. This requires more understanding of your use case, but look over the information provided to understand where this could be happening in your code.

RaghuMundru
New Contributor III

Thanks for the valuable information.I will look deep in to the data to understand where it is happeneing.

RaghuMundru
New Contributor III

@Miklos

I did clean the data and was successful in converting to parquet however when I am trying to add new columns to my Dataframe and try to convert that back in to parquet its failing .

Any help would be appreciated.

Please open a new forum post for this to keep the issues isolated.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.