In order to download multiple wikipedia dumps, I collected the links in the list and wanted to use foreach method to iterate over those links and apply a UDF that downloads the data in the previously created volume structure. However, I ran into an issue that some files were not downloaded completely. I observed that 1 out of 4 tasks completed almost immediately and produced those non-complete files. After I changed the number of partitions to < 4 the process finishes correctly.
The code looks as followed:
def download_wikimedia_dump(row, volume: str):
"""
Download a wikimedia dump file.
"""
link = row["link"]
url = BASE_URL + link
response = requests.get(url, stream=True)
with open(os.path.join(volume, link), mode="wb") as file:
for chunk in response.iter_content(chunk_size=10 * 1024):
file.write(chunk)
response.close()
# it works with < number of cores of the worker
num_partitions = 3
download_basic = partial(download_wikimedia_dump, volume=volume_path_basic)
sdf_basic = spark.createDataFrame(pd.DataFrame(list_of_links, columns=["link"]))
sdf_basic.repartition(num_partitions).foreach(download_basic)
For now, I am fine with simply setting the value like that, but this is not what I am expecting to happen.
Looking forward for your feedback.
Alexey