07-16-2025 12:18 AM
Hello
I have a very simple table with time series data with three columns:
For every second there is one entry for each signal.
I want to create a view with 1 minute average of the signals but I am having trouble to get the DictionaryFilters to push down the ts filter:
My view is currently defined like this:
SELECT
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) AS ts
id,
AVG(value) AS avg
FROM
measurements
GROUP BY
id,
FLOOR((ts - 1) / 60)
If I make a select on this view with where filter on signal_id and ts the plan shows that the filter is only pushed down for the id:
DictionaryFilters: [(id#10256L = 1234)]
But I make a query directly on the base table and make the where filter there is is pushed down:
DictionaryFilters: [(id#10917L = 1234), ((ts#10920L >= 1751328000) AND (ts#10920L <= 1751414400))]
I tried to adapt the view to not use group by, but with no success. Everytime I use some sort of AVG() Over or something else, the ts filter is not pushed down anymore.
Is there a way to create a view where I can set filter with where ts between x and x that will be pushed down to the DictionaryFilters?
Thanks
Steffen
07-16-2025 06:40 AM - edited 07-16-2025 06:50 AM
Hi @Steffen ,
To_timestamp() is a function. Let me show you on example. I recreated your data as you can see in below screen:
Now, for the sake of example I didn't apply any functions to attibutes and the filter pushdown works as expected:
df = spark.sql("""
SELECT
ts,
id,
AVG(value) AS avg
FROM raw_measurements
GROUP BY
id,
ts
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
Now, let's try to apply to_timestamp function to ts attribute:
df = spark.sql("""
SELECT
id,
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts,
AVG(value) AS avg
FROM raw_measurements
GROUP BY
id,
ts
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN '2025-07-01T00:54:00.000+00:00' AND '2025-07-02T00:54:00.000+00:00'
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
As you can see, applying simple function to an attribute can prevent Spark SQL optimizer to kick in:
Your second example is a bit different though:
SELECT
id,
ts,
AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
measurements
Here I believe you encounter similar case to one described at below link:
[SPARK-23985] predicate push down doesn't work with simple compound partition spec - ASF JIRA
In short, filters are getting pushed only if they appear in the partitionSpec of window function. So, when you're using it like this:
df = spark.sql("""
SELECT
id,
ts,
AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value
FROM
raw_measurements
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
Then pushed filters don't work:
But when you add ts attribute to partition by clause then optimizer will do its job:
And finally, what I recommended to try was to calculate at upstream table following attribute:
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts_aligned
And then create a view:
df = spark.sql("""
SELECT
ts_aligned,
id,
AVG(value) AS avg
FROM
raw_measurements
GROUP BY
id,
ts_aligned
""")
df.createOrReplaceTempView("test_view")
Now, optimizer is again able to push filter to source:
07-16-2025 03:00 AM
Hi @Steffen ,
This happens because you're applying some functions to ts attribute like FLOOR, from_unix_timestamp etc., which hides the raw ts from Spark's optimizer, so it can’t push down filters.
If you can, try to add additional attribute to your upstream table with below convertion:
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60))
Then you won't need to use any functions in your view and it should help the optimizer do its job.
07-16-2025 04:00 AM
I dont think the to_timestamp() is the function but the group by or more precise the arregate over ts?
Because even if I keep the ts as raw this does not work.
E.G. with this view definition, it still does not push down the ts filer:
SELECT
id,
ts,
AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
measurements
07-16-2025 06:40 AM - edited 07-16-2025 06:50 AM
Hi @Steffen ,
To_timestamp() is a function. Let me show you on example. I recreated your data as you can see in below screen:
Now, for the sake of example I didn't apply any functions to attibutes and the filter pushdown works as expected:
df = spark.sql("""
SELECT
ts,
id,
AVG(value) AS avg
FROM raw_measurements
GROUP BY
id,
ts
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
Now, let's try to apply to_timestamp function to ts attribute:
df = spark.sql("""
SELECT
id,
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts,
AVG(value) AS avg
FROM raw_measurements
GROUP BY
id,
ts
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN '2025-07-01T00:54:00.000+00:00' AND '2025-07-02T00:54:00.000+00:00'
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
As you can see, applying simple function to an attribute can prevent Spark SQL optimizer to kick in:
Your second example is a bit different though:
SELECT
id,
ts,
AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value,
FROM
measurements
Here I believe you encounter similar case to one described at below link:
[SPARK-23985] predicate push down doesn't work with simple compound partition spec - ASF JIRA
In short, filters are getting pushed only if they appear in the partitionSpec of window function. So, when you're using it like this:
df = spark.sql("""
SELECT
id,
ts,
AVG(value) OVER (ORDER BY ts ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) AS avg_value
FROM
raw_measurements
""")
df.createOrReplaceTempView("test_view")
query = f"""
SELECT *
FROM test_view
WHERE id = 1 AND ts BETWEEN 1751328004 AND 1751328104
"""
#display(spark.sql(query))
# Show logical/physical plan to inspect pushdown
spark.sql(query).explain(True)
Then pushed filters don't work:
But when you add ts attribute to partition by clause then optimizer will do its job:
And finally, what I recommended to try was to calculate at upstream table following attribute:
to_timestamp(from_unixtime(FLOOR((ts - 1) / 60) * 60 + 60)) as ts_aligned
And then create a view:
df = spark.sql("""
SELECT
ts_aligned,
id,
AVG(value) AS avg
FROM
raw_measurements
GROUP BY
id,
ts_aligned
""")
df.createOrReplaceTempView("test_view")
Now, optimizer is again able to push filter to source:
07-16-2025 08:00 AM
Thank you for the detailed explanation!
Our upstream table is very big (300.000+ signals and over 5 years of data). Adding a new column and with that rewrite the whole table... In the end we would like to have multiple views based on different time intervals to resample, eg. 60 sec, 15 min, 60 min, 1 day.
Than we would need to add all the aligned timestamps as columns in the upstream table. If we would want to add a new interval, this would mean to rewrite the whole table...
I could not come up with a way to get the avg by timespan without using some sort of function (like floor) on the ts column, so I guess to achieved our needs, we would need the ts_aligned columns in the table.
As the workaround is to just not create views but always query on the measurements table directly, we need to consider the cost of the rewriting of the measurement table with the new columns.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now