09-26-2024 07:42 AM
Hey guys, after some successful data preprocessing without any errors, i have a final dataframe shape with the shape of ~ (200M, 150). the cluster i am using has sufficient ram + cpus + autoscaling, all metrics look fine after the job was done.
The problem that i am facing is that only approx half of my data is written to the delta table. is there any thoughts on as to why this is happening? I'd really appreciate some guidance here! here is my code snippet:
repartition_df = df.repartition(<num_cores*2; or 1>).persist() # i also tried without using df persist but no luck either, none of them works repartition_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(output_table) chunked_df.unpersist()
09-27-2024 07:32 AM
@szymon_dybczak i could resolve it now! basically, i broke the process down into further subprocesses, for each sub process, i cached and wrote them all into delta table (without overwritting), the next subprocess needs to read data in the delta table written by the prev subprocess. that way i could write all the data into the delta table without losing any records. so glad the pain has ended now!
09-26-2024 07:47 AM
Hi @datastones ,
Are you 100% sure that your final df contains all the rows that you're expecting? Maybe some join condition at earlier stage causes this problem?
09-26-2024 07:52 AM
Hey @szymon_dybczak, i appreciate your prompt response! yup, I check the dataframe shape and show some output along the way prior to writing it out. the shape looks checked out right before i perform the write... what a mystery this is!
09-26-2024 07:58 AM
1 interesting thing that I observed when playing around with different repartition size: 2,4,8,16,32,64. i have 32 cores per node. The total written result increased up to 65% of the final df at repartition = 8 (but this is very inconsistent and hard to reproduce...)
09-26-2024 09:32 AM
Definitely weird, basically since you're writing in delta format you shouldn't lose any data. Delta tables support ACID transaction, so either it writes to your table all data or it leaves table in consistent state (before write action).
In your situation, I would run df.count() before wrtiting to delta table to double check that dataframe contains expected number of rows.
You can also get rid of persist from above code, it's useful if you're going to use the same dataframe in different set of calculation. No use for that just before writing to the table.
09-27-2024 07:32 AM
@szymon_dybczak i could resolve it now! basically, i broke the process down into further subprocesses, for each sub process, i cached and wrote them all into delta table (without overwritting), the next subprocess needs to read data in the delta table written by the prev subprocess. that way i could write all the data into the delta table without losing any records. so glad the pain has ended now!
09-27-2024 07:36 AM
Hi @datastones ,
Cool that you've found the solution! Thanks for sharing.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group