cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

OSError: [Errno 107] Transport endpoint is not connected

nihar_ghude
New Contributor II

Hi,

I am facing this error when performing write operation in foreach() on a dataframe. The piece of code was working fine for over 3 months but started failing since last week.

nihar_ghude_0-1710175215407.png

To give some context, I have a dataframe extract_df which contains 2 columns xml_full_name and content.

I use the below code to write these xmls to the target folder in ADLS.
extract_df.foreach(write_file)
'write_file' is defined as:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
 
The notebook also uses spark.write command to write some parquets which is working fine.
On investigating, I found that this issue could be related to parallelism and as a work around I tried the below:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
This works which means the connection is fine but parallelism isn't working. However, this can't be used as a fix though as it will bring down the performance.
 
Anyone here has faced this issue? If this is related to some configuration, any suggestions on what and where I can check. All inputs are welcome.
Thanks.
2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @nihar_ghude

  • Instead of using foreach(), consider using foreachBatch(). This method allows you to apply custom logic on the output of each micro-batch, which can help address parallelism issues.
  • Unlike foreach(), which operates on individual rows, foreachBatch() processes entire micro-batches of data.

Thanks for the suggestion. I am not using structured streaming in the code. Not sure, if i can use `foreachBatch()` without it.

I have added a `try` block around the `foreach()` logic and added the `foreachPartition()` logic in an `except` block.

Also, the error resolved itself without any changes from our side. We are still awaiting response from Databricks support as to why the error intermittently appeared.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group