cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Retrieve error column/row when writing to sqlmi

JamesY
New Contributor III

Databricks notebook, Scala, .read() .write(), source data csv. Got this error when trying to write data to sqlmi. I understand the error indicated one of the source column value's length exceeded the database column's value length. But the message is insufficient to track which specific row is causing the issue.

We can definitely loop through each row but that will greatly impact the performance when having millions+ rows of data.

*** What are the suggest solutions? ***

Error message:
"Job aborted due to stage failure: Task 0 in stage 35.0 failed 4 times, most recent failure: Lost task 0.3 in stage 35.0 (TID 93) (10.76.206.133 executor 1): com.microsoft.sqlserver.jdbc.SQLServerException: The given value of type NVARCHAR(1581) from the data source cannot be converted to type nvarchar(1024) of the specified target column. at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.validateStringBinaryLengths(SQLServerBulkCopy.java:1690) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeColumn(SQLServerBulkCopy.java:3006) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeBatchData(SQLServerBulkCopy.java:3647) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.doInsertBulk(SQLServerBulkCopy.java:1566) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.access$300(SQLServerBulkCopy.java:65) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy$1InsertBulk.doExecute(SQLServerBulkCopy.java:663) at com.microsoft.sqlserver.jdbc.TDSCommand.execute(IOBuffer.java:7418) at com.microsoft.sqlserver.jdbc.SQLServerConnection.executeCommand(SQLServerConnection.java:3272) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.sendBulkLoadBCP(SQLServerBulkCopy.java:697) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:1654) at com.microsoft.sqlserver.jdbc.SQLServerBulkCopy.writeToServer(SQLServerBulkCopy.java:620) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.bulkWrite(BulkCopyUtils.scala:110) at com.microsoft.sqlserver.jdbc.spark.BulkCopyUtils$.savePartition(BulkCopyUtils.scala:58) at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2(BestEffortSingleInstanceStrategy.scala:43) at com.microsoft.sqlserver.jdbc.spark.SingleInstanceWriteStrategies$.$anonfun$write$2$adapted(BestEffortSingleInstanceStrategy.scala:42) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2(RDD.scala:1025) at org.apache.spark.rdd.RDD.$anonfun$foreachPartition$2$adapted(RDD.scala:1025) at org.apache.spark.SparkContext.$anonfun$runJob$2(SparkContext.scala:2705) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:75) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:55) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:161) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:125) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:95) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$13(Executor.scala:832) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1681) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:835) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:690) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) Driver stacktrace:"

1 ACCEPTED SOLUTION

Accepted Solutions

brockb
Databricks Employee
Databricks Employee

Thanks JamesY.

Maybe I've misunderstood, but isn't that the goal? To filter out data that will cause the load to SQLMI to fail?

If you're just looking to identify the records that may fail upon write to SQLMI could we just modify the `where` clause as follows:

 

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") > 1024)

 

 

View solution in original post

6 REPLIES 6

brockb
Databricks Employee
Databricks Employee

Hi James,

What is the source of the data you're loading to Azure SQLMI and how are you performing the load? Is it possible to perform a `select * from table where len(candidate_column) <= 1024` or the dataframe equivalent to filter out the invalid rows?

JamesY
New Contributor III

Hi Brockb, sorry I forgot to mentioned that, I am working with Scala, and .read() .write() method, source data csv

brockb
Databricks Employee
Databricks Employee

Thanks JamesY, I'm not familiar with the limitations of the SQL Server `nvarchar` data type but is there a way that we can filter out the rows that will fail using spark such as:

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") <= 1024).write(...)

 

JamesY
New Contributor III

Thanks for your reply, the .where() is a great method to filter data, but if there are data violated the condition, I got an empty dataframe, are there some methods I can use to track which specific data that causing the issue?

brockb
Databricks Employee
Databricks Employee

Thanks JamesY.

Maybe I've misunderstood, but isn't that the goal? To filter out data that will cause the load to SQLMI to fail?

If you're just looking to identify the records that may fail upon write to SQLMI could we just modify the `where` clause as follows:

 

spark.read.format("csv").option("header", "true").load("/path/to/csvs").where(length("the_column_in_question") > 1024)

 

 

JamesY
New Contributor III

Hi brockb,

The goal is when load to SQLMI to failed, I want to locate the invalid data. Not just filter them out and pass the valid data. Sorry for the confusion.

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now