cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

TASK_WRITE_FAILED when trying to write on the table, Databricks (Scala)

chemajar
New Contributor III

Hello,

I have a code on Databricks (Scala) that constructs a df and then write it to a Database table. It is working fine for almost all of the tables, but there is a table with a problem. It says No module named 'delta.connect' - TASK_WRITE_FAILED.

In the penultimate cell I have:

display(data)

 And looks fine (not bad format):

chemajar_2-1710422710695.png

Then, in the last cell I have:

chemajar_5-1710422843604.png

 

The text of error is the following:

 
ModuleNotFoundError: No module named 'delta.connect'
--------------------------------------------------------------------------- _MultiThreadedRendezvous Traceback (most recent call last) File /databricks/spark/python/pyspark/sql/connect/client/core.py:1414, in SparkConnectClient._execute_and_fetch_as_iterator(self, req) 1411 generator = ExecutePlanResponseReattachableIterator( 1412 req, self._stub, self._retry_policy, self._builder.metadata() 1413 ) -> 1414 for b in generator: 1415 yield from handle_response(b) File /usr/lib/python3.10/_collections_abc.py:330, in Generator.__next__(self) 327 """Return the next item from the generator. 328 When exhausted, raise StopIteration. 329 """ --> 330 return self.send(None) File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:131, in ExecutePlanResponseReattachableIterator.send(self, value) 129 def send(self, value: Any) -> pb2.ExecutePlanResponse: 130 # will trigger reattach in case the stream completed without result_complete --> 131 if not self._has_next(): 132 raise StopIteration() File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:188, in ExecutePlanResponseReattachableIterator._has_next(self) 187 self._release_all() --> 188 raise e 189 return False File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:160, in ExecutePlanResponseReattachableIterator._has_next(self) 159 try: --> 160 self._current = self._call_iter( 161 lambda: next(self._iterator) # type: ignore[arg-type] 162 ) 163 except StopIteration: File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:285, in ExecutePlanResponseReattachableIterator._call_iter(self, iter_fun) 284 self._iterator = None --> 285 raise e 286 except Exception as e: 287 # Remove the iterator, so that a new one will be created after retry. File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:267, in ExecutePlanResponseReattachableIterator._call_iter(self, iter_fun) 266 try: --> 267 return iter_fun() 268 except grpc.RpcError as e: File /databricks/spark/python/pyspark/sql/connect/client/reattach.py:161, in ExecutePlanResponseReattachableIterator._has_next.<locals>.<lambda>() 159 try: 160 self._current = self._call_iter( --> 161 lambda: next(self._iterator) # type: ignore[arg-type] 162 ) 163 except StopIteration: File /databricks/python/lib/python3.10/site-packages/grpc/_channel.py:426, in _Rendezvous.__next__(self) 425 def __next__(self😞 --> 426 return self._next() File /databricks/python/lib/python3.10/site-packages/grpc/_channel.py:826, in _MultiThreadedRendezvous._next(self) 825 elif self._state.code is not None: --> 826 raise self _MultiThreadedRendezvous: <_MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.INTERNAL details = "Job aborted due to stage failure: Task 0 in stage 3406.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3406.0 (TID 3637) (172.26.0.11 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://ucatalogwemetastore@azwesadbricksunitycatmst.dfs.core.windows.net/metastore/cddf9e6b-f8c2-4735-a50e-cabc613c06db/tables/58535ee1-b3c7-43d1-afbc-21efde623438. at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:927) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:551) at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:116) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:934) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:934) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60) at org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407) at org.apache.spark.rdd.RDD.iterator(RDD.scala:374) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41) at com.databricks.unity.HandleImpl.runWith(UCSHandle..." debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {created_time:"2024-03-14T13:18:45.417841112+00:00", grpc_status:13, grpc_message:"Job aborted due to stage failure: Task 0 in stage 3406.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3406.0 (TID 3637) (172.26.0.11 executor 0): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to abfss://ucatalogwemetastore@azwesadbricksunitycatmst.dfs.core.windows.net/metastore/cddf9e6b-f8c2-4735-a50e-cabc613c06db/tables/58535ee1-b3c7-43d1-afbc-21efde623438.\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:927)\n\tat org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:551)\n\tat org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:116)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:934)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:934)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)\n\tat org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:410)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:407)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:374)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.$anonfun$runTask$1(ResultTask.scala:82)\n\tat com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196)\n\tat org.apache.spark.scheduler.Task.doRunTask(Task.scala:181)\n\tat org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146)\n\tat com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:41)\n\tat com.databricks.unity.HandleImpl.runWith(UCSHandle..."}" > During handling of the above exception, another exception occurred: ModuleNotFoundError Traceback (most recent call last) File <command-2290610805725404>, line 5 1 ( 2 data.write 3 .format("delta") 4 .mode("append") ----> 5 .insertInto(targetTable) 6 ) File /databricks/spark/python/pyspark/sql/connect/readwriter.py:660, in DataFrameWriter.insertInto(self, tableName, overwrite) 658 self._write.table_name = tableName 659 self._write.table_save_method = "insert_into" --> 660 self._spark.client.execute_command(self._write.command(self._spark.client)) File /databricks/spark/python/pyspark/sql/connect/client/core.py:1079, in SparkConnectClient.execute_command(self, command) 1077 req.user_context.user_id = self._user_id 1078 req.plan.command.CopyFrom(command) -> 1079 data, _, _, _, properties = self._execute_and_fetch(req) 1080 if data is not None: 1081 return (data.to_pandas(), properties) File /databricks/spark/python/pyspark/sql/connect/client/core.py:1441, in SparkConnectClient._execute_and_fetch(self, req, self_destruct) 1438 schema: Optional[StructType] = None 1439 properties: Dict[str, Any] = {} -> 1441 for response in self._execute_and_fetch_as_iterator(req): 1442 if isinstance(response, StructType): 1443 schema = response File /databricks/spark/python/pyspark/sql/connect/client/core.py:1422, in SparkConnectClient._execute_and_fetch_as_iterator(self, req) 1420 yield from handle_response(b) 1421 except Exception as error: -> 1422 self._handle_error(error) File /databricks/spark/python/pyspark/sql/connect/client/core.py:1706, in SparkConnectClient._handle_error(self, error) 1693 """ 1694 Handle errors that occur during RPC calls. 1695 (...) 1703 Throws the appropriate internal Python exception. 1704 """ 1705 if isinstance(error, grpc.RpcError): -> 1706 self._handle_rpc_error(error) 1707 elif isinstance(error, ValueError😞 1708 if "Cannot invoke RPC" in str(error) and "closed" in str(error): File /databricks/spark/python/pyspark/sql/connect/client/core.py:1742, in SparkConnectClient._handle_rpc_error(self, rpc_error) 1740 info = error_details_pb2.ErrorInfo() 1741 d.Unpack(info) -> 1742 raise convert_exception(info, status.message) from None 1744 raise SparkConnectGrpcException(status.message) from None 1745 else: File /databricks/spark/python/pyspark/errors/exceptions/connect.py:90, in convert_exception(info, message) 84 return PythonException( 85 "\n An exception was thrown from the Python worker. " 86 "Please see the stack trace below.\n%s" % message 87 ) 89 # BEGIN-EDGE ---> 90 from delta.connect.exceptions import _convert_delta_exception 92 delta_exception = _convert_delta_exception(info, message) 93 if delta_exception is not None: ModuleNotFoundError: No module named 'delta.connect'

 

I am the owner of the table, so I don't think it is an error permission. Maybe of the capacity of the server?, because it says: TASK_WRITE_FAILED

When constructing the data dataframe it conects to MongoDB to obtain the configuration of the table (delimiters, etc) and the name of the fileds. (I think doesn´t have any importance)

If it helps, the original csv has 116 columns, and 3900 files (the rest of the tables doesn't have so many columns).
Maybe the number of columns is a problem?

Any help or suggestion would be appreciatte.

Thanks.

1 REPLY 1

Kaniz_Fatma
Community Manager
Community Manager

Hi @chemajar , It appears that you’re encountering a module import issue related to ‘delta.connect’ when writing data to a database table in Databricks using Scala.

Let’s troubleshoot this together!

The error message “ModuleNotFoundError: No module named ‘delta.connect’” suggests that the required module is not found. Here are some steps you can take to resolve this issue:

  1. Check Dependencies:

    • Ensure that you have the necessary dependencies installed. Specifically, make sure you have the Delta Lake library available. Delta Lake provides features like ACID transactions and time travel for data lakes.
    • You can add the Delta Lake library by including the following line in your Databricks notebook:
      %pip install delta-spark
      
    • After running this command, restart your cluster to apply the changes.
  2. Import Correctly:

    • Double-check that you are importing the correct module. The correct import statement for Delta Lake is usually:
      import io.delta.tables._
      
    • Make sure you are using the correct package name (io.delta.tables) in your code.
  3. Cluster Configuration:

    • Verify that your Databricks cluster is configured correctly. Ensure that it has access to the necessary libraries and packages.
    • If you’re using a custom cluster, make sure it includes the Delta Lake library.
  4. Restart Kernel:

    • Sometimes, a kernel restart can resolve import issues. Try restarting the kernel in your Databricks notebook and then re-run your code.
  5. Check Spark Version:

    • Ensure that your Spark version is compatible with the Delta Lake library. If you’re using an older version of Spark, consider upgrading to a more recent version.
  6. Databricks Connect:

    • If you’re running your code locally using Databricks Connect, make sure you’ve set up Databricks Connect correctly. Refer to the official documentation for instructions on how to use Databricks Connect with Python.

Remember to apply these steps one by one, testing your code after each change. Hopefully, this will help you resolve the issue and successfully write data to your database table! 🚀

If you encounter any further problems or need additional assistance, feel free to ask!

 
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!