Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-02-2025 02:02 PM
Hi @Raj_DB
Since you're still facing write performance issues even after cluster optimization,
Just try below Staged Writing approach:
def staged_write_approach(df, destination_path):
"""Write in stages to avoid memory pressure"""
# Stage 1: Write to temporary location without partitioning
temp_path = f"{destination_path}_temp_{int(time.time())}"
print("Stage 1: Writing to temporary location...")
start_time = time.time()
df.coalesce(16) \
.write \
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.option("maxRecordsPerFile", "15000000") \
.save(temp_path)
stage1_time = (time.time() - start_time) / 60
print(f"Stage 1 completed in {stage1_time:.2f} minutes")
# Stage 2: Read back and write with partitioning
print("Stage 2: Reading back and applying partitioning...")
start_time = time.time()
temp_df = spark.read.parquet(temp_path)
temp_df.write \
.partitionBy("year", "month") \
.mode('overwrite') \
.format('parquet') \
.option('compression', 'zstd') \
.option("maxRecordsPerFile", "15000000") \
.save(destination_path)
stage2_time = (time.time() - start_time) / 60
print(f"Stage 2 completed in {stage2_time:.2f} minutes")
# Cleanup
dbutils.fs.rm(temp_path, True)
print(f"Total staged write time: {stage1_time + stage2_time:.2f} minutes")
# Use staged writing
staged_write_approach(final_df, destination_path)
LR