cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

foreach execution faulty with number of partitions >= worker cores

AlexeyEgorov
New Contributor II

In order to download multiple wikipedia dumps, I collected the links in the list and wanted to use foreach method to iterate over those links and apply a UDF that downloads the data in the previously created volume structure. However, I ran into an issue that some files were not downloaded completely. I observed that 1 out of 4 tasks completed almost immediately and produced those non-complete files. After I changed the number of partitions to < 4 the process finishes correctly.

The code looks as followed:

 

 

def download_wikimedia_dump(row, volume: str):
    """
    Download a wikimedia dump file.
    """
    link = row["link"]
    url = BASE_URL + link

    response = requests.get(url, stream=True)
    with open(os.path.join(volume, link), mode="wb") as file:
        for chunk in response.iter_content(chunk_size=10 * 1024):
            file.write(chunk)
    response.close()

# it works with < number of cores of the worker
num_partitions = 3
download_basic = partial(download_wikimedia_dump, volume=volume_path_basic)

sdf_basic = spark.createDataFrame(pd.DataFrame(list_of_links, columns=["link"]))
sdf_basic.repartition(num_partitions).foreach(download_basic)

 

 

For now, I am fine with simply setting the value like that, but this is not what I am expecting to happen.

Looking forward for your feedback.

Alexey

 

1 REPLY 1

Walter_C
Databricks Employee
Databricks Employee

It seems like the issue you're encountering with incomplete file downloads when using the foreach method and a UDF in Spark might be related to the number of partitions and how tasks are distributed across them. Here are a few points to consider:

  1. Task Distribution and Partitioning: When you set the number of partitions to a value greater than the number of cores available on the worker, it can lead to inefficient task distribution. This might cause some tasks to complete almost immediately without fully processing the data, resulting in incomplete file downloads. By reducing the number of partitions to less than the number of cores, you ensure that each core is fully utilized, leading to more consistent task execution and complete file downloads.

  2. Network and I/O Bound Operations: Downloading files over the network can be I/O bound, and having too many concurrent tasks might overwhelm the network or the I/O subsystem, leading to incomplete downloads. Reducing the number of partitions can help mitigate this bylimiting the number of concurrent download operations.

  3. Error Handling and Retries: Ensure that your UDF includes proper error handling and retry logic for network operations. This can help recover from transient network issues that might cause incomplete downloads.

  4. Resource Allocation: Check the resource allocation for your Spark job. Ensure that there are enough resources (CPU, memory, and network bandwidth) allocated to handle the download tasks efficiently.

  5. Spark Configuration: You might want to experiment with different Spark configurations related to task scheduling and resource allocation to find the optimal settings for your workload.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group