03-06-2024 01:40 AM
Hi,
We are trying to ingest zip files into Azure Databricks delta lake using COPY INTO command.
There are 100+ zip files with average size of ~300MB each.
Cluster configuration:
Following Spark parameters set at cluster level:
Following Spark parameters set at the notebook level (while running the COPY INTO command).
spark = SparkSession.builder.appName("YourApp").config("spark.sql.execution.arrow.enabled", "true").config("spark.sql.execution.arrow.maxRecordsPerBatch", "100").config("spark.databricks.io.cache.maxFileSize", "2G").config("spark.network.timeout", "1000s").config("spark.driver.maxResultSize","2G").getOrCreate()
We are consistently getting the following error while trying to ingest the zip files:
Job aborted due to stage failure: Task 77 in stage 33.0 failed 4 times, most recent failure: Lost task 77.3 in stage 33.0 (TID 1667) (10.139.64.12 executor 20): ExecutorLostFailure (executor 20 exited caused by one of the running tasks) Reason: Command exited with code 50 The error stack looks like this:Py4JJavaError: An error occurred while calling o360.sql. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 77 in stage 33.0 failed 4 times, most recent failure: Lost task 77.3 in stage 33.0 (TID 1667) (10.139.64.12 executor 20): ExecutorLostFailure (executor 20 exited caused by one of the running tasks) Reason: Command exited with code 50
Driver stacktrace: at
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:3628) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:3559) at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:3546) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:3546) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1521) at scala.Option.foreach(Option.scala:407) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1521) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3875) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3787) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:3775) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:51) at org.apache.spark.scheduler.DAGScheduler.$anonfun$runJob$1(DAGScheduler.scala:1245) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV
This works for less number of zip files (upto 20). Even this was not working with default cluster configuration. We had to increase driver and worker config and increase parallelism and executor memory options at cluster level as mentioned above. Now this higher config is failing when trying to ingest more zip files. We ideally don't wish to increase the cluster config any further as that's not the optimal solution and number of files can keep increasing.
Please advise.
CC: @Anup
03-07-2024 12:59 AM
Hi @nikhilmb, It seems you’re encountering issues while ingesting zip files into Azure Databricks Delta Lake using the COPY INTO
command.
Let’s troubleshoot this together.
First, let’s review some key points related to COPY INTO
:
COPY INTO Command:
COPY INTO
SQL command allows you to load data from a file location into a Delta table.Requirements:
COPY INTO
.Example: Load Data into a Schemaless Delta Lake Table:
CREATE TABLE IF NOT EXISTS my_table
COPY INTO my_table FROM '/path/to/files' FILEFORMAT = <format> FORMAT_OPTIONS ('mergeSchema' = 'true') COPY_OPTIONS ('mergeSchema' = 'true');
COPY INTO
. After data ingestion, the table becomes queryable.Example: Set Schema and Load Data into a Delta Lake Table:
-- Create target table and load data
CREATE TABLE IF NOT EXISTS user_ping_target;
COPY INTO user_ping_target FROM ${c.source} FILEFORMAT = JSON FORMAT_OPTIONS ('mergeSchema' = 'true');
Common Data Loading Patterns:
COPY INTO
. You can explore them in the official documentation.Now, let’s address the error you’re encountering. The error message indicates that Task 77 in stage 33.0 failed due to an ExecutorLostFailure with exit code 50.
Here are some steps to troubleshoot:
Resource Constraints:
Executor Failures:
Network Timeout:
spark.network.timeout
parameter is set to 1000s. Consider adjusting this value based on your network conditions.Driver Configuration:
Autoscaling:
Retry Mechanism:
COPY INTO
is re-triable, consider implementing a retry mechanism in your code to handle transient failures.If you need further assistance, feel free to ask! 😊
03-11-2024 12:36 AM
Thanks for the response.
We tried all the suggestions in the post. It's still failing.
I think Spark tries to unzip files during ingestion and that's where it goes out of memory. May be ingesting zip files is not supported yet. We are now exploring the Unity Catalog Volume option to ingest zip files and access them in the delta lake.
03-14-2024 01:34 AM
Just in the hope that this might benefit other users, we have decided to go for the good-old way of mounting cloud object store onto DBFS and then ingesting data from mounted drive into Unity Catalog-managed volume. Tried this for the 500+ zip files and it is working as expected.
04-10-2024 02:33 AM
Although we were able to copy the zip files onto the DB volume, we were not able to share them with any system outside of the Databricks environment. Guess delta sharing does not support sharing files that are on UC volumes.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group