cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Predicate pushdown query

kasiviss42
New Contributor III

Does predicate pushdown works when we provide a filter on a dataframe reading a delta table with 2 lakh values

 

i.e

filter condition:column is in(list)

list contains 2lakh elements 

 

i need to get n number of columns from a table 

i am currently using join here

2lakh records join 1 billion records

 

thinking if i add the filter before join on the 1 billion table will it give performance improvement?

 

 

 

 

3 REPLIES 3

hari-prasad
Valued Contributor II

Hi @kasiviss42,

This might sound like a rhetorical question, but letโ€™s delve into the complexity of joins and filters and examine how generating a list of 2lakh values affects it.

 

Let's assume we have fact table with 1 billion record and dimension table with 2 lakh records

Fact Table = 1 Billion
Dimension Table = 2 lakh

Join - Complexity Considerations:

  • Data Volume: Joining 1 billion rows with 200,000 unique IDs can be computationally expensive.
  • Data Shuffling: The join operation may require significant data shuffling across the cluster nodes, which can be time-consuming and resource intensive.
  • Indexing: If the join columns (ID columns) are indexed, the performance can be improved. Without indexes, the join operation will be slower.
  • Result Size: The result set includes columns from both tables, which can increase the amount of data processed and transferred.
  • Reference example, below join query on id column, which is taking average 0.45 to 0.60 seconds. This may vary based on data volume and number of join conditions.
    hariprasad_1-1737199034992.png

Filter - Complexity Considerations:

  • Filter Operation: This query applies a filter to the fact table, which is generally less complex than a join operation.
  • Data Scanning: The performance of the filter operation depends on the size of the fact table. If the table is large, scanning it can still be time-consuming.
  • Indexing: If the ID column is indexed, the filter operation can be significantly faster. Without an index, the query will perform a full table scan.
  • Result Size: The query selects all columns (*), which can increase the amount of data processed and transferred, but it is still likely to be less than the join operation.
  • Reference Example:
    • Below shows the filter based on sub-query from dimension table, which is taking average 0.35 to 0.50 seconds. This may vary based on data volume and indexing. hariprasad_3-1737199129693.png

       

    • Below shows the filter based on values listed manually, which is taking around 0.21 to 0.35 seconds. This may vary based on data volume and indexing. hariprasad_5-1737200470951.png

 

Now let's talk about generating 2lakh value using Python or Scala and passing value to select query as filter.

Let's consider roughly the time taken by different steps in pyspark, read data >> collect from dataframe >> loop through values >> join values and pass to filter. This would take additional time read and prepare values for filter. Below code is taking average 0.57 to 0.80 seconds based on number of values from dimension table.

hariprasad_4-1737200250275.png

 

Hope this helps.

Regards,
Hari Prasad

 



Regards,
Hari Prasad

Thanks for the detailed analysis.

 

My scenario is like same base dataframe with 2 lakh unique records goes through the entire notebook traversing through 10-15 joins with tables around 1-5 billion records each.

 

so it might take time intially to get the list variable populated. But i can reuse the same list in 10-15 joins only if this really improves the performance.

hari-prasad
Valued Contributor II

I recommend using a Join or Filter with a subquery for a safer approach. Manual looping can increase runtime, especially as the number of records in your dimension table grows.

If you are performing repetitive joins across multiple blocks or stages and your dimension table consistently contains around 2lakh records, consider using a broadcast join. This involves making the dimension table a broadcast DataFrame, which can then be used across multiple joins.



Regards,
Hari Prasad

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group