10-14-2021 11:54 AM
Scenario : I have a dataframe that have 5 billion records/rows and 100+ columns. Is there a way to write this in a delta format efficiently. I have tried to export it but cancelled it after 2 hours (write didnt finish) as this processing time is not unacceptable in business perpective. What I did so far is to export using this syntax line . I am using Python btw.
dataframe_name.repartition(50).write.partitionBy(<colA>,<colB>).format("delta").mode('overwrite').option("overwriteSchema", "true").save(<StoragePathLocation>)
Cluster Config:
Standard_DS4_V2 - 28GB Mem, 8 Cores
10-14-2021 11:50 PM
As data format is transaction not sure is needed, maybe better just use parquet file.
You can read data source of dataframes in chunks and use append mode.
If data is often updated I think you would need to consider structured streaming.
10-20-2021 07:55 AM
Hello @Hubert Dudek Thank you for your inputs. I haven't heard structured streaming as I only started working on Databricks recently. I am also using ADF to orchestrate the notebook, will this impact the process?
10-20-2021 09:06 AM
Yes with streaming you still can run it by Azure Data Factory. Just if you want to run stream indefinitely you can not use schedule and need to handle somehow possible failures. If you want to set that stream is finished (timed out) after some time you can set schedule (for example timeout after 23.9 hours, run every day). In fact every case is different.
10-15-2021 12:20 AM
Is this a one time operation or do you want to write this 5 billion records regularly?
If it is a one time operation you can use the optimized writes of delta lake:
set spark.databricks.delta.properties.defaults.autoOptimize.optimizeWrite = true;
set spark.databricks.delta.properties.defaults.autoOptimize.autoCompact = true;
Like that the partition size is handled automatically and you can ommit the repartition(50).
Because this repartition will cause lots of shuffles.
Also check if you need the overwriteschema option. I don't know for sure if that has a performance impact but any check that does not have to be executed is gained time.
New data can be added in append mode or merge mode (if updates are necessary).
If it is a regular recurring job and you have to overwrite the data over and over (or drop and recreate), I do not see a lot of added value in using delta lake. Maybe the automated partition size, and read optimizations. But without knowing the data flow downwards, this is hard to assess.
10-20-2021 08:03 AM
Hi @Werner Stinckens ,Thank you for you inputs.
This is daily operation unfortunately. Overwrite and autoCompact are already enabled. Will try omitting repartition(50) . I also tried Salting and broadcast join it helps somehow but not making a huge difference.
10-20-2021 08:46 AM
Ok, so there are also joins in the picture.
I think you have to check the Spark UI and try to find the issue.
It might be a join which causes this or data skew.
The repartition will cause a shuffle for sure.
10-20-2021 09:24 AM
If you have 5 billion records, partition by 2 keys may be too much. Each partition should be about 10-50GB. Also, the repartition(50) may create 50 files in each directory.
10-21-2021 10:18 AM
Hi @Franco Sia ,
I will recommend to avoid to use the repartition(50), instead enable optimizes writes on your Delta table. You can find more details here
Enable optimized writes and auto compaction on your Delta table. Use AQE (docs here) to have enough shuffle partitions and also check your joins in case you have any data skew.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group