cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Can't write big DataFrame into MSSQL server by using jdbc driver on Azure Databricks

ninjadev999
New Contributor II

I'm reading a huge csv file including 39,795,158 records and writing into MSSQL server, on Azure Databricks. The Databricks(notebook) is running on a cluster node with 56 GB Memory, 16 Cores, and 12 workers.

This is my code in Python and PySpark:

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
 from time import sleep
 
url = "jdbc:sqlserver://{0}:{1};database={2}".format(server, port, database)
spark.conf.set("spark.databricks.io.cache.enabled", True)
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
 
# Read csv file.
df_lake = spark.read \
    .option('header', 'false') \
    .schema(s) \
    .option('delimiter', ',') \
    .csv('wasbs://...')
 
 
batch_size = 60000
rows = df_lake.count()
org_pts = df_lake.rdd.getNumPartitions() # 566
new_pts = 1990
 
# Re-partition the DataFrame
df_repartitioned = df_lake.repartition(new_pts)
 
# Write the DataFrame into MSSQL server, by using JDBC driver
df_repartitioned.write \
            .format("jdbc") \
            .mode("overwrite") \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .option("url", url) \
            .option("dbtable", tablename) \
            .option("user", username) \
            .option("password", password) \
            .option("batchsize", batch_size) \
            .save()
sleep(10)

Then I got the logs and errors as following as:

```

rows: 39795158

org_pts: 566

new_pts: 1990

Copy error: An error occurred while calling o9647.save.

: org.apache.spark.SparkException: Job aborted due to stage failure: Task 62 in stage 462.0 failed 4 times, most recent failure: Lost task 62.3 in stage 462.0 (TID 46609) (10.139.64.12 executor 27): com.microsoft.sqlserver.jdbc.SQLServerException: The connection is closed.

at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDriverError(SQLServerException.java:234)

at com.microsoft.sqlserver.jdbc.SQLServerConnection.checkClosed(SQLServerConnection.java:1217)

at com.microsoft.sqlserver.jdbc.SQLServerConnection.rollback(SQLServerConnection.java:3508)

at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.savePartition(JdbcUtils.scala:728)

at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1(JdbcUtils.scala:857)

at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.$anonfun$saveTable$1$adapted(JdbcUtils.scala:855)

at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025)

at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025)

at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2517)

at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55)

at org.apache.spark.scheduler.Task.doRunTask(Task.scala:150)

at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:119)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.scheduler.Task.run(Task.scala:91)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:813)

at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1620)

at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:816)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:672)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

at java.lang.Thread.run(Thread.java:748)

...

```

For 3 - 6 millions records, it was no problem. But for 10 millions or above records, it was failed. I'm not sure why it was happened on 10 millions or above records.

Are there any solutions for huge DataFrame process on Azure Databricks?

I posted this error on StackOverflow, too. You can check the error on https://stackoverflow.com/questions/71076648/cant-write-big-dataframe-into-mssql-server-by-using-jdb...

1 ACCEPTED SOLUTION

Accepted Solutions

User16764241763
Honored Contributor

Hi,

If you are using Azure SQL DB Managed instance, could you please file a support request with Azure team? This is to review any timeouts, perf issues on the backend.

Also, it seems like the timeout is coming from SQL Server which is closing the connection after some time.

You can possibly add below this in the JDBC connection string as a param.

;queryTimeout=7200

The below connector developed by Microsoft SQL team has some bulk load options to speed up large data loads. Please give it a try

https://github.com/microsoft/sql-spark-connector

View solution in original post

7 REPLIES 7

-werners-
Esteemed Contributor III

I don´t see why you use databricks for this. There is no data transformations, only moving data from csv to a database.

Why don´t you do this with Data Factory?

The thing is: Databricks will not have issues processing this csv file, but when you write it to a database, that will become the bottleneck. You also have a cluster running which is basically mainly waiting.

Thanks for your response!

Actually, I don't know well about Data Factory..

To speed up to process big DataFrame, I used repartition mode, according to this blog: https://devblogs.microsoft.com/azure-sql/partitioning-on-spark-fast-loading-clustered-columnstore-in...

Could you let me know how can I use it to overwrite all data in csv file into MSSQL? Is it easy?

-werners-
Esteemed Contributor III

Data Factory is pretty easy. Basically you define a source where the data resides using a UI.

Define a sink where the data has to land, also using a UI and then execute a job.

For plain data movement it is ideal. But if you have to add transformations etc, I prefer Databricks (you can do this also in Data Factory but I do not like the no/low code tools).

I should make some changes in each records. After that, I write them into MSSQL. So looks like the Databricks is better to use it...

I just needed to speed up to write big dataframe into MSSQL db and fix the error.

-werners-
Esteemed Contributor III

you can try to land the transformed data on some storage in Azure/AWS,

then copy those files to the db using Data Factory or Glue (AWS)

It's not solution what I want. sorry.

I want to know what's wrong in my code and the reason why it was failed. and I want to improve the performance for big DataFrame processing.

User16764241763
Honored Contributor

Hi,

If you are using Azure SQL DB Managed instance, could you please file a support request with Azure team? This is to review any timeouts, perf issues on the backend.

Also, it seems like the timeout is coming from SQL Server which is closing the connection after some time.

You can possibly add below this in the JDBC connection string as a param.

;queryTimeout=7200

The below connector developed by Microsoft SQL team has some bulk load options to speed up large data loads. Please give it a try

https://github.com/microsoft/sql-spark-connector

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.