cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

OSError: [Errno 107] Transport endpoint is not connected

nihar_ghude
New Contributor II

Hi,

I am facing this error when performing write operation in foreach() on a dataframe. The piece of code was working fine for over 3 months but started failing since last week.

nihar_ghude_0-1710175215407.png

To give some context, I have a dataframe extract_df which contains 2 columns xml_full_name and content.

I use the below code to write these xmls to the target folder in ADLS.
extract_df.foreach(write_file)
'write_file' is defined as:
def write_file(row):
with open(row.extract_path, "wb") as f:
f.write(row.content)
 
The notebook also uses spark.write command to write some parquets which is working fine.
On investigating, I found that this issue could be related to parallelism and as a work around I tried the below:
for row in extract_df.collect():
with open(row.extract_path, "wb") as f:
f.write(row.content)
This works which means the connection is fine but parallelism isn't working. However, this can't be used as a fix though as it will bring down the performance.
 
Anyone here has faced this issue? If this is related to some configuration, any suggestions on what and where I can check. All inputs are welcome.
Thanks.
2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @nihar_ghude

  • Instead of using foreach(), consider using foreachBatch(). This method allows you to apply custom logic on the output of each micro-batch, which can help address parallelism issues.
  • Unlike foreach(), which operates on individual rows, foreachBatch() processes entire micro-batches of data.

nihar_ghude
New Contributor II

Thanks for the suggestion. I am not using structured streaming in the code. Not sure, if i can use `foreachBatch()` without it.

I have added a `try` block around the `foreach()` logic and added the `foreachPartition()` logic in an `except` block.

Also, the error resolved itself without any changes from our side. We are still awaiting response from Databricks support as to why the error intermittently appeared.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.