cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Retrieve error column/row when writing to sqlmi

JamesY
New Contributor III

Databricks notebook, Scala, .read() .write(), source data csv. Got this error when trying to write data to sqlmi. I understand the error indicated one of the source column value's length exceeded the database column's value length. But the message is insufficient to track which specific row is causing the issue.

We can definitely loop through each row but that will greatly impact the performance when having millions+ rows of data.

*** What are the suggest solutions? ***

Error message:
"Job aborted due to stage failure: Task 0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 35.0 (TID 93) (10.76.206.133 executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: The given value of type NVARCHAR(1581) from the data source cannot be converted to type nvarchar(1024) of the specified target column. at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.validateStringBinaryLengths(SQLServerBulkCopy.java:1690) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeColumn(SQLServerBulkCopy.java:3006) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeBatchData(SQLServerBulkCopy.java:3647) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1566) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$300(SQLServerBulkCopy.java:65) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:663) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:697) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1654) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:620) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.bulkWrite(BulkCopyUtils.scala:110) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.savePartition(BulkCopyUtils.scala:58) at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2(BestEffortSingleInstanceStrategy.scala:43) at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2$adapted(BestEffortSingleInstanceStrategy.scala:42) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025) at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2705) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:161) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:95) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace:"

1 ACCEPTED SOLUTION

Accepted Solutions

brockb
Valued Contributor
Valued Contributor

Thanks JamesY.

Maybe I've misunderstood, but isn't that the goal? To filter out data that will cause the load to SQLMI to fail?

If you're just looking to identify the records that may fail upon write to SQLMI could we just modify the `where` clause as follows:

 

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") > 1024)

 

 

View solution in original post

6 REPLIES 6

brockb
Valued Contributor
Valued Contributor

Hi James,

What is the source of the data you're loading to Azure SQLMI and how are you performing the load? Is it possible to perform a `select * from table where len(candidate_column) <= 1024` or the dataframe equivalent to filter out the invalid rows?

JamesY
New Contributor III

Hi Brockb, sorry I forgot to mentioned that, I am working with Scala, and .read() .write() method, source data csv

brockb
Valued Contributor
Valued Contributor

Thanks JamesY, I'm not familiar with the limitations of the SQL Server `nvarchar` data type but is there a way that we can filter out the rows that will fail using spark such as:

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") <= 1024).write(...)

 

JamesY
New Contributor III

Thanks for your reply, the .where() is a great method to filter data, but if there are data violated the condition, I got an empty dataframe, are there some methods I can use to track which specific data that causing the issue?

brockb
Valued Contributor
Valued Contributor

Thanks JamesY.

Maybe I've misunderstood, but isn't that the goal? To filter out data that will cause the load to SQLMI to fail?

If you're just looking to identify the records that may fail upon write to SQLMI could we just modify the `where` clause as follows:

 

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") > 1024)

 

 

JamesY
New Contributor III

Hi brockb,

The goal is when load to SQLMI to failed, I want to locate the invalid data. Not just filter them out and pass the valid data. Sorry for the confusion.

 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!