lingareddy_Alva
Esteemed Contributor

Hi @Raj_DB  

Since you're still facing write performance issues even after cluster optimization,
Just try below Staged Writing approach:

def staged_write_approach(df, destination_path):
    """Write in stages to avoid memory pressure"""
    
    # Stage 1: Write to temporary location without partitioning
    temp_path = f"{destination_path}_temp_{int(time.time())}"
    
    print("Stage 1: Writing to temporary location...")
    start_time = time.time()
    
    df.coalesce(16) \
        .write \
        .mode('overwrite') \
        .format('parquet') \
        .option('compression', 'zstd') \
        .option("maxRecordsPerFile", "15000000") \
        .save(temp_path)
    
    stage1_time = (time.time() - start_time) / 60
    print(f"Stage 1 completed in {stage1_time:.2f} minutes")
    
    # Stage 2: Read back and write with partitioning
    print("Stage 2: Reading back and applying partitioning...")
    start_time = time.time()
    
    temp_df = spark.read.parquet(temp_path)
    
    temp_df.write \
        .partitionBy("year", "month") \
        .mode('overwrite') \
        .format('parquet') \
        .option('compression', 'zstd') \
        .option("maxRecordsPerFile", "15000000") \
        .save(destination_path)
    
    stage2_time = (time.time() - start_time) / 60
    print(f"Stage 2 completed in {stage2_time:.2f} minutes")
    
    # Cleanup
    dbutils.fs.rm(temp_path, True)
    print(f"Total staged write time: {stage1_time + stage2_time:.2f} minutes")

# Use staged writing
staged_write_approach(final_df, destination_path)
LR