cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

OSError: [Errno 107] Transport endpoint is not connected

nihar_ghude
New Contributor II

Hi,

I am facing this error when performing write operation in foreach() on a dataframe. The piece of code was working fine for over 3 months but started failing since last week.

nihar_ghude_0-1710175215407.png

To give some context, I have a dataframe extract_df which contains 2 columns xml_full_name and content.

I use the below code to write these xmls to the target folder in ADLS.
extract_df.foreach(write_file)
'write_file' is defined as:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
 
The notebook also uses spark.write command to write some parquets which is working fine.
On investigating, I found that this issue could be related to parallelism and as a work around I tried the below:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
This works which means the connection is fine but parallelism isn't working. However, this can't be used as a fix though as it will bring down the performance.
 
Anyone here has faced this issue? If this is related to some configuration, any suggestions on what and where I can check. All inputs are welcome.
Thanks.
2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @nihar_ghude

  • Instead of using foreach(), consider using foreachBatch(). This method allows you to apply custom logic on the output of each micro-batch, which can help address parallelism issues.
  • Unlike foreach(), which operates on individual rows, foreachBatch() processes entire micro-batches of data.

Thanks for the suggestion. I am not using structured streaming in the code. Not sure, if i can use `foreachBatch()` without it.

I have added a `try` block around the `foreach()` logic and added the `foreachPartition()` logic in an `except` block.

Also, the error resolved itself without any changes from our side. We are still awaiting response from Databricks support as to why the error intermittently appeared.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!