cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DictionaryFilters Pushdown on Views

Steffen
New Contributor III

Hello

I have a very simple table with time series data with three columns:

  • id (long): unique id of signal
  • ts (unix timestamp): timestamp of the event in unix timestamp format
  • value (double): value of the signal at the given timestamp

For every second there is one entry for each signal.

I want to create a view with 1 minute average of the signals but I am having trouble to get the DictionaryFilters to push down the ts filter:

My view is currently defined like this:

SELECT
  to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) AS ts
  id,
  AVG(value) AS avg
FROM
  measurements
GROUP BY
  id,
  FLOOR((ts - 1) / 60)

If I make a select on this view with where filter on signal_id and ts the plan shows that the filter is only pushed down for the id:
DictionaryFilters: [(id#10256L = 1234)]

But I make a query directly on the base table and make the where filter there is is pushed down:

DictionaryFilters: [(id#10917L = 1234), ((ts#10920L >= 1751328000) AND (ts#10920L <= 1751414400))]

I tried to adapt the view to not use group by, but with no success. Everytime I use some sort of AVG() Over or something else, the ts filter is not pushed down anymore.

Is there a way to create a view where I can set filter with where ts between x and x that will be pushed down to the DictionaryFilters?

Thanks

Steffen

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @Steffen ,

To_timestamp() is a function. Let me show you on example. I recreated your data as you can see in below screen:


szymon_dybczak_0-1752672125836.png


Now, for the sake of example I didn't apply any functions to attibutes and the filter pushdown works as expected:

df = spark.sql("""
    SELECT
        ts,
        id,
        AVG(value) AS avg
    FROM raw_measurements  
    GROUP BY
    id,
    ts
""")

df.createOrReplaceTempView("test_view")

query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)

 

szymon_dybczak_1-1752672472215.png

Now, let's try to apply to_timestamp function to ts attribute:

df = spark.sql("""
    SELECT    
        id,    
        to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts,
        AVG(value) AS avg
    FROM raw_measurements  
    GROUP BY
    id,
    ts
""")

df.createOrReplaceTempView("test_view")

query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN  '2025-07-01T00:54:00.000+00:00' AND '2025-07-02T00:54:00.000+00:00'
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)

As you can see, applying simple function to an attribute can prevent Spark SQL optimizer to kick in:

szymon_dybczak_2-1752672760266.png


Your second example is a bit different though:


SELECT
    id,
    ts,
    AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
    measurements

Here I believe you encounter similar case to one described at below link:

[SPARK-23985] predicate push down doesn't work with simple compound partition spec - ASF JIRA

In short, filters are getting pushed only if they appear in the partitionSpec of window function. So, when you're using it like this:

df = spark.sql("""
    SELECT
        id,
        ts,
        AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value
    FROM
        raw_measurements
""")

df.createOrReplaceTempView("test_view")


query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)


Then pushed filters don't work:

szymon_dybczak_3-1752673059756.png

But when you add ts attribute to partition by clause then optimizer will do its job:

szymon_dybczak_4-1752673160387.png

 

szymon_dybczak_5-1752673195410.png

 

And finally, what I recommended to try was to calculate at upstream table following attribute:

to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts_aligned

And then create a view:

df = spark.sql("""
    SELECT
        ts_aligned,
        id,
        AVG(value) AS avg
    FROM
        raw_measurements
    GROUP BY
    id,
    ts_aligned
""")

df.createOrReplaceTempView("test_view")

 

Now, optimizer is again able to push filter to source:

szymon_dybczak_6-1752673811801.png

 

 

View solution in original post

4 REPLIES 4

szymon_dybczak
Esteemed Contributor III

Hi @Steffen ,

 

This happens because you're applying some functions to ts attribute like FLOOR, from_unix_timestamp etc., which hides the raw ts from Spark's optimizer, so it can’t push down filters.

If you can, try to add additional attribute to your upstream table with below convertion:

to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) 

 Then you won't need to use any functions in your view and it should help the optimizer do its job.

I dont think the to_timestamp() is the function but the group by or more precise the arregate over ts?

Because even if I keep the ts as raw this does not work.

E.G. with this view definition, it still does not push down the ts filer:

SELECT
    id,
    ts,
    AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
    measurements

 

szymon_dybczak
Esteemed Contributor III

Hi @Steffen ,

To_timestamp() is a function. Let me show you on example. I recreated your data as you can see in below screen:


szymon_dybczak_0-1752672125836.png


Now, for the sake of example I didn't apply any functions to attibutes and the filter pushdown works as expected:

df = spark.sql("""
    SELECT
        ts,
        id,
        AVG(value) AS avg
    FROM raw_measurements  
    GROUP BY
    id,
    ts
""")

df.createOrReplaceTempView("test_view")

query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)

 

szymon_dybczak_1-1752672472215.png

Now, let's try to apply to_timestamp function to ts attribute:

df = spark.sql("""
    SELECT    
        id,    
        to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts,
        AVG(value) AS avg
    FROM raw_measurements  
    GROUP BY
    id,
    ts
""")

df.createOrReplaceTempView("test_view")

query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN  '2025-07-01T00:54:00.000+00:00' AND '2025-07-02T00:54:00.000+00:00'
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)

As you can see, applying simple function to an attribute can prevent Spark SQL optimizer to kick in:

szymon_dybczak_2-1752672760266.png


Your second example is a bit different though:


SELECT
    id,
    ts,
    AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
    measurements

Here I believe you encounter similar case to one described at below link:

[SPARK-23985] predicate push down doesn't work with simple compound partition spec - ASF JIRA

In short, filters are getting pushed only if they appear in the partitionSpec of window function. So, when you're using it like this:

df = spark.sql("""
    SELECT
        id,
        ts,
        AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value
    FROM
        raw_measurements
""")

df.createOrReplaceTempView("test_view")


query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)


Then pushed filters don't work:

szymon_dybczak_3-1752673059756.png

But when you add ts attribute to partition by clause then optimizer will do its job:

szymon_dybczak_4-1752673160387.png

 

szymon_dybczak_5-1752673195410.png

 

And finally, what I recommended to try was to calculate at upstream table following attribute:

to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts_aligned

And then create a view:

df = spark.sql("""
    SELECT
        ts_aligned,
        id,
        AVG(value) AS avg
    FROM
        raw_measurements
    GROUP BY
    id,
    ts_aligned
""")

df.createOrReplaceTempView("test_view")

 

Now, optimizer is again able to push filter to source:

szymon_dybczak_6-1752673811801.png

 

 

Thank you for the detailed explanation! 

Our upstream table is very big (300.000+ signals and over 5 years of data). Adding a new column and with that rewrite the whole table... In the end we would like to have multiple views based on different time intervals to resample, eg. 60 sec, 15 min, 60 min, 1 day.
Than we would need to add all the aligned timestamps as columns in the upstream table. If we would want to add a new interval, this would mean to rewrite the whole table...

I could not come up with a way to get the avg by timespan without using some sort of function (like floor) on the ts column, so I guess to achieved our needs, we would need the ts_aligned columns in the table.

As the workaround is to just not create views but always query on the measurements table directly, we need to consider the cost of the rewriting of the measurement table with the new columns.

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now