02-11-2022 12:15 AM
I'm reading a huge csv file including 39,795,158 records and writing into MSSQL server, on Azure Databricks. The Databricks(notebook) is running on a cluster node with 56 GB Memory, 16 Cores, and 12 workers.
This is my code in Python and PySpark:
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from time import sleep
url = "jdbc:sqlserver://{0}:{1};database={2}".format(server, port, database)
spark.conf.set("spark.databricks.io.cache.enabled", True)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
# Read csv file.
df_lake = spark.read \
.option('header', 'false') \
.schema(s) \
.option('delimiter', ',') \
.csv('wasbs://...')
batch_size = 60000
rows = df_lake.count()
org_pts = df_lake.rdd.getNumPartitions() # 566
new_pts = 1990
# Re-partition the DataFrame
df_repartitioned = df_lake.repartition(new_pts)
# Write the DataFrame into MSSQL server, by using JDBC driver
df_repartitioned.write \
.format("jdbc") \
.mode("overwrite") \
.option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("url", url) \
.option("dbtable", tablename) \
.option("user", username) \
.option("password", password) \
.option("batchsize", batch_size) \
.save()
sleep(10)
Then I got the logs and errors as following as:
```
rows: 39795158
org_pts: 566
new_pts: 1990
Copy error: An error occurred while calling o9647.save.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 62 in stage 462.0 failed 4 times, most recent failure: Lost task 62.3 in stage 462.0 (TID 46609) (10.139.64.12 executor 27): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.
at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1217)
at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3508)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:857)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:855)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)
at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)
at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)
at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)
at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.scheduler.Task.run(Task.scala:91)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
...
```
For 3 - 6 millions records, it was no problem. But for 10 millions or above records, it was failed. I'm not sure why it was happened on 10 millions or above records.
Are there any solutions for huge DataFrame process on Azure Databricks?
I posted this error on StackOverflow, too. You can check the error on https://stackoverflow.com/questions/71076648/cant-write-big-dataframe-into-mssql-server-by-using-jdb...
02-15-2022 06:29 AM
Hi,
If you are using Azure SQL DB Managed instance, could you please file a support request with Azure team? This is to review any timeouts, perf issues on the backend.
Also, it seems like the timeout is coming from SQL Server which is closing the connection after some time.
You can possibly add below this in the JDBC connection string as a param.
;queryTimeout=7200
The below connector developed by Microsoft SQL team has some bulk load options to speed up large data loads. Please give it a try
02-11-2022 02:58 AM
I don´t see why you use databricks for this. There is no data transformations, only moving data from csv to a database.
Why don´t you do this with Data Factory?
The thing is: Databricks will not have issues processing this csv file, but when you write it to a database, that will become the bottleneck. You also have a cluster running which is basically mainly waiting.
02-11-2022 06:19 AM
Thanks for your response!
Actually, I don't know well about Data Factory..
To speed up to process big DataFrame, I used repartition mode, according to this blog: https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-in...
Could you let me know how can I use it to overwrite all data in csv file into MSSQL? Is it easy?
02-11-2022 06:32 AM
Data Factory is pretty easy. Basically you define a source where the data resides using a UI.
Define a sink where the data has to land, also using a UI and then execute a job.
For plain data movement it is ideal. But if you have to add transformations etc, I prefer Databricks (you can do this also in Data Factory but I do not like the no/low code tools).
02-11-2022 06:57 AM
I should make some changes in each records. After that, I write them into MSSQL. So looks like the Databricks is better to use it...
I just needed to speed up to write big dataframe into MSSQL db and fix the error.
02-11-2022 07:10 AM
you can try to land the transformed data on some storage in Azure/AWS,
then copy those files to the db using Data Factory or Glue (AWS)
02-11-2022 05:31 PM
It's not solution what I want. sorry.
I want to know what's wrong in my code and the reason why it was failed. and I want to improve the performance for big DataFrame processing.
02-15-2022 06:29 AM
Hi,
If you are using Azure SQL DB Managed instance, could you please file a support request with Azure team? This is to review any timeouts, perf issues on the backend.
Also, it seems like the timeout is coming from SQL Server which is closing the connection after some time.
You can possibly add below this in the JDBC connection string as a param.
;queryTimeout=7200
The below connector developed by Microsoft SQL team has some bulk load options to speed up large data loads. Please give it a try
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group