cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How can I optimize my data pipeline?

User16835756816
Valued Contributor

Delta Lake provides optimizations that can help you accelerate your data lake operations. Here’s how you can improve query speed by optimizing the layout of data in storage.

There are two ways you can optimize your data pipeline: 1) Notebook Optimizations and 2) Jobs Optimizations

Notebook Optimizations using Delta Write Optimize

Compaction (bin-packing)

Improve the speed of read queries from a table simply by adding a few lines of code. Bin-packing aims to produce evenly-balanced data files with respect to their size on disk, but not necessarily number of tuples per file. 

Z-Ordering (multi-dimensional clustering)

Z-Ordering is a technique to colocate related information in the same set of files, dramatically reducing the amount of data that Delta Lake needs to read when executing a query.

Trigger compaction by running the OPTIMIZE command and trigger Z-Ordering by running the ZORDER BY command. Find the syntax for both here.

Jobs Optimizations using Workflows

Reduce the manual effort required by stringing your single-task jobs into a multi-task format. To create a multi-task format job, use the tasks field in JobSettings to specify settings for each task. Find an example of a job with two notebook tasks here.

Drop your questions, feedback and tips below!

3 REPLIES 3

Ajay-Pandey
Esteemed Contributor III

Thanks for sharing.

Hubert-Dudek
Esteemed Contributor III

some tips from me:

  • Look for data skews; some partitions can be huge, some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions()), especially SQL can divide it unequally to partitions (there are settings in connector lowerBound, etc.). You could try to have the number of partitions as workers' cores multiply by X (so they will be processed step by step in the queue),
  • Check for data spills in Spark UI as they mean writing shuffle partitions from RAM to disks and back. (25th, 50th, and 75th percentile should be similar). Increase shuffle partitions if they have to be frequently written to disk.
  • Increase shuffle size spark.SQL.shuffle.partitions default is 200 try bigger, you should calculate it as data size divided by the size of the partition,
  • Increase the size of the driver to be two times bigger than the executor (but to get the optimal size, please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integrate datadog with cluster),
  • Check wide transformations, ones that need to shuffle data between partitions, group them to do one shuffle only,
  • If you need to filter data, if possible, do it after reading from SQL so it will be predicative push so it will add where in SQL query,
  • Make sure that everything runs in a distributed way, specially UDF. It would help if you used vectorized pandas udfs so that they will run on executors. Even pandas UDFs can be slow. You can try instead of registering the SQL function in metastore and use it in your SQL queries.
  • Don't use collect etc., as they are not running in the distributed way but instead are loading everything to memory object on the driver (it is like executing notebook locally)
  • Regarding infrastructure, use more workers and check that your ADLS is connected via a private link.
  • You can also use premium ADLS which is faster.
  • Sometimes I process big data as a stream as it is easier with big data sets. In that scenario, you would need Kafka (confluent cloud is excellent) between SQL and Databricks. If you use the stream for one-time processing, please use AvailableNow instead of Once.
  • When you read data, remember that if you read everything from a big unpartitioned file (around 200 MB each) will be faster. So for a delta, you can OPTIMIZE before reading. Of course, if you read, for example, last x days partitioning per day will be faster, but running OPTIMIZE can help anyway.
  • Regarding writing, the best is that you get as many files as cores/partitions by default, so every core in parallel is working on writing one file. Later you can merge them by mentioning OPTIMIZE. Please check that every file is similar in size. If not, it seems that you have a skew in partitions. In a huge dataset, sometimes it is good to salt number and then partition by that number to make than equal (you can get the number of cores from SparkContext.DefaultParallelism property)

Thank you for sharing your great tips @Hubert Dudek​ 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!