How can I optimize my data pipeline?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2023 03:55 PM
Delta Lake provides optimizations that can help you accelerate your data lake operations. Here’s how you can improve query speed by optimizing the layout of data in storage.
There are two ways you can optimize your data pipeline: 1) Notebook Optimizations and 2) Jobs Optimizations
Notebook Optimizations using Delta Write Optimize
Compaction (bin-packing)
Improve the speed of read queries from a table simply by adding a few lines of code. Bin-packing aims to produce evenly-balanced data files with respect to their size on disk, but not necessarily number of tuples per file.
Z-Ordering (multi-dimensional clustering)
Z-Ordering is a technique to colocate related information in the same set of files, dramatically reducing the amount of data that Delta Lake needs to read when executing a query.
Trigger compaction by running the OPTIMIZE command and trigger Z-Ordering by running the ZORDER BY command. Find the syntax for both here.
Jobs Optimizations using Workflows
Reduce the manual effort required by stringing your single-task jobs into a multi-task format. To create a multi-task format job, use the tasks field in JobSettings to specify settings for each task. Find an example of a job with two notebook tasks here.
Drop your questions, feedback and tips below!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2023 09:41 PM
Thanks for sharing.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-24-2023 10:40 AM
some tips from me:
- Look for data skews; some partitions can be huge, some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions()), especially SQL can divide it unequally to partitions (there are settings in connector lowerBound, etc.). You could try to have the number of partitions as workers' cores multiply by X (so they will be processed step by step in the queue),
- Check for data spills in Spark UI as they mean writing shuffle partitions from RAM to disks and back. (25th, 50th, and 75th percentile should be similar). Increase shuffle partitions if they have to be frequently written to disk.
- Increase shuffle size spark.SQL.shuffle.partitions default is 200 try bigger, you should calculate it as data size divided by the size of the partition,
- Increase the size of the driver to be two times bigger than the executor (but to get the optimal size, please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integrate datadog with cluster),
- Check wide transformations, ones that need to shuffle data between partitions, group them to do one shuffle only,
- If you need to filter data, if possible, do it after reading from SQL so it will be predicative push so it will add where in SQL query,
- Make sure that everything runs in a distributed way, specially UDF. It would help if you used vectorized pandas udfs so that they will run on executors. Even pandas UDFs can be slow. You can try instead of registering the SQL function in metastore and use it in your SQL queries.
- Don't use collect etc., as they are not running in the distributed way but instead are loading everything to memory object on the driver (it is like executing notebook locally)
- Regarding infrastructure, use more workers and check that your ADLS is connected via a private link.
- You can also use premium ADLS which is faster.
- Sometimes I process big data as a stream as it is easier with big data sets. In that scenario, you would need Kafka (confluent cloud is excellent) between SQL and Databricks. If you use the stream for one-time processing, please use AvailableNow instead of Once.
- When you read data, remember that if you read everything from a big unpartitioned file (around 200 MB each) will be faster. So for a delta, you can OPTIMIZE before reading. Of course, if you read, for example, last x days partitioning per day will be faster, but running OPTIMIZE can help anyway.
- Regarding writing, the best is that you get as many files as cores/partitions by default, so every core in parallel is working on writing one file. Later you can merge them by mentioning OPTIMIZE. Please check that every file is similar in size. If not, it seems that you have a skew in partitions. In a huge dataset, sometimes it is good to salt number and then partition by that number to make than equal (you can get the number of cores from SparkContext.DefaultParallelism property)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-30-2023 01:51 PM
Thank you for sharing your great tips @Hubert Dudek