cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pushdown of datetime filter to date partition.

Stokholm
New Contributor III

Hi Everybody,

I have 20 years of data, 600m rows.

I have partitioned them on year and month to generated a files size which seems reasonable.(128Mb)

All data is queried using timestamp, as all queries needs to filter on the exact hours.

So my requirement is that when filtering on timestamp it is only to access the year and month partition equivalent to the timestamp.

This seems to be feasible using generated columns, according to the documentation from both databricks and microsoft.

Table batch reads and writes — Delta Lake Documentation

https://docs.delta.io/latest/delta-batch.html#use-generated-columns

Use Delta Lake generated columns - Azure Databricks | Microsoft Learn

https://learn.microsoft.com/en-us/azure/databricks/delta/generated-columns

Generally the say the same.

DeltaTable.create(spark) \
  .tableName("default.events") \
  .addColumn("eventId", "BIGINT") \
  .addColumn("data", "STRING") \
  .addColumn("eventType", "STRING") \
  .addColumn("eventTime", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
  .partitionedBy("eventType", "year", "month", "day") \
  .execute()
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')

"Delta Lake automatically generates a partition filter so that the preceding query only reads the data in partition 

year=2020/month=10/day=01

 even if a partition filter is not specified.

You can use an EXPLAIN clause and check the provided plan to see whether Delta Lake automatically generates any partition filters."

My problem is, that when I use the EXPLAIN command - you can clearly see that it does not use the partitionfilter as intended.

%sql
EXPLAIN Formatted SELECT * FROM default.events2 WHERE eventTime = "2023-01-01 00:00:00"
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet default.events2 (1)
 
 
(1) Scan parquet default.events2
Output [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/events2]
PushedFilters: [IsNotNull(eventTime), EqualTo(eventTime,2023-01-01 00:00:00.0)]
ReadSchema: struct<eventId:bigint,data:string,eventTime:timestamp>
 
(2) ColumnarToRow [codegen id : 1]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
 
(3) Filter [codegen id : 1]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
Condition : (isnotnull(eventTime#25670472) AND (eventTime#25670472 = 2023-01-01 00:00:00))
 
(4) Project [codegen id : 1]
Output [7]: [eventId#25670469L, data#25670470, eventType#25670471, eventTime#25670472, year#25670473, month#25670474, day#25670475]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]

Where as if I include day, you can clearly see a partition filter.

"PartitionFilters: [isnotnull(day#25670689), (day#25670689 = 1)]"

%sql
EXPLAIN Formatted SELECT * FROM default.events2 WHERE eventTime = "2023-01-01 00:00:00" and day = 1
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet default.events2 (1)
 
 
(1) Scan parquet default.events2
Output [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/events2]
PartitionFilters: [isnotnull(day#25670689), (day#25670689 = 1)]
PushedFilters: [IsNotNull(eventTime), EqualTo(eventTime,2023-01-01 00:00:00.0)]
ReadSchema: struct<eventId:bigint,data:string,eventTime:timestamp>
 
(2) ColumnarToRow [codegen id : 1]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
 
(3) Filter [codegen id : 1]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
Condition : (isnotnull(eventTime#25670686) AND (eventTime#25670686 = 2023-01-01 00:00:00))
 
(4) Project [codegen id : 1]
Output [7]: [eventId#25670683L, data#25670684, eventType#25670685, eventTime#25670686, year#25670687, month#25670688, day#25670689]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]

We are on version 10.4 - so this should be supported.

Has anyone achieved this? or can you see any issues in in my example?

Best Regards,

Rasmus

9 REPLIES 9

Anonymous
Not applicable

@Rasmus Stokholm​ :

It seems that you have correctly partitioned your data based on year and month and also defined generated columns for year, month, and day in your Delta table. When you run a query that filters on eventTime column, Delta Lake should automatically generate a partition filter based on the query condition and read only the relevant partitions, as explained in the Delta Lake documentation.

However, you mentioned that when you ran an EXPLAIN command, you did not see the expected partition filter being applied. In one case, you saw a partition filter when filtering on day in addition to eventTime, but not when filtering only on eventTime.

One thing to note is that Delta Lake may choose to read additional partitions if it determines that it would be more efficient to do so, for example, if the partition size is too small or if data skew is detected. You can use the spark.databricks.delta.logLevel configuration property to see more detailed logs and understand why Delta Lake is reading certain partitions.

Another possible reason for not seeing the partition filter being applied is that the query optimizer may have decided to use a different execution plan that does not rely on partition pruning. In this case, you may want to use the HINT syntax to force the optimizer to use a specific execution plan.

Lastly, you can also try using the PARTITIONED BY clause in your CREATE TABLE statement to explicitly define the partition columns and ensure that partition pruning is always used, as this is the preferred way to define partitions in Delta Lake.

Anonymous
Not applicable

Hi @Rasmus Stokholm​ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Stokholm
New Contributor III

Hi @Vidula Khanna​ and @Suteja Kanuri​ 

Thanks you for your suggestions.

But i haven't really progressed.

I set the loglevel to debug. But i'm not quite sure on where to check the outcome.

I didnt find any usable hints, that forces the access plan to use the partitions.

I did try to CREATE TABLE in sql with the PARTITIONED BY-clause directly.

But it still has the same outcome.

I then tried to switch to using the delta lake in azure storage and connecting through synapse via openrowset.

I inserted 1.8 billion rows and confirmed the partitioning took place. Each folder (year and month) has 3-4 files of 10-30 MB.

The sql here shows that when i query the openrowset without year and month and just the timestamp it scans about 14 GB.

When i include the year and month in the where clause it scans. 35MB.

So its not just time lost it also cost a lot of money due to the unnecessary scans.

Anonymous
Not applicable

@Rasmus Stokholm​ : Please let me know if this helps!

It's good that you set the log level to debug, but it's important to know where the logs are written to so you can check them. Usually, the logs will be written to a file on the server where the query is executed, or they may be sent to a centralized logging system.

Regarding forcing the access plan to use partitions, you can use the

EXPLAIN statement to see the query plan generated by the database engine. This will give you insight into how the engine is processing your query and whether it's using partitioning as expected.

When you created the table in SQL with the PARTITIONED BY clause, did you also specify the partitioning column in the query? For example, if you partitioned the table by year and month, did you include those columns in your WHERE clause when querying the table?

It's good to hear that switching to Delta Lake in Azure Storage and using Synapse via OpenRowset helped with the partitioning issue. It's important to optimize your queries and data storage to avoid unnecessary scans, as you mentioned, to reduce both time and cost.

Stokholm
New Contributor III

Hi @Suteja Kanuri​,

I think you misunderstood my post. The Movement to Delta Lake in Azure Storage did not help. It still showed that everything was being scanned when using the timestamp in the where-clause.

I did not include the partitioned columns themselves in the where-clause, I only include the column(timestamp) that the partitioned columns where derived from.

As per documentation the should make the optimizer able to use the partitionfilter, but it does not.

Delta Lake may be able to generate partition filters for a query whenever a partition column is defined by one of the following expressions:

  • YEAR(col) and the type of col is TIMESTAMP.
  • Two partition columns defined by YEAR(col), MONTH(col) and the type of col is TIMESTAMP.
  • ...

If a partition column is defined by one of the preceding expressions, and a query filters data using the underlying base column of a generation expression, Delta Lake looks at the relationship between the base column and the generated column, and populates partition filters based on the generated partition column if possible.

Anonymous
Not applicable

@Rasmus Stokholm​ :

Based on what you've mentioned, it seems that the partition filters are not being utilized by the optimizer, even though the necessary partition columns are defined and the query is using the underlying base column of a generation expression. This could be due to a number of factors, including the specific syntax of the query or the structure of the data.

One potential solution you could try is to use the EXPLAIN command to get more information about how the query is being executed. This can help identify any issues with the query syntax or data structure that may be preventing the optimizer from using partition filters. Additionally, you could try reorganizing the data or reformatting the query to see if this makes a difference in how the partition filters are being used.

Stokholm
New Contributor III

Hi @Suteja Kanuri​ 

I have tried the EXPLAIN command on several different query types.

Both with really simple queries and also more complex and restricted on other columns.

None of them apply the partitionfilter when only using the timestamp in the where clause.

If i use the partition-columns in the where-clause the the partitionfilters are enforced and the query goes a lot quicker. (Of course)

But it is paramount that this filtering works with the timestamp as well. All users using these table filter on the timestamp alone.

Anonymous
Not applicable

@Rasmus Stokholm​ : does this help?

It is possible that the partition pruning is not happening because the partition column is not being properly defined as a timestamp type. Delta Lake uses the partitioning information to optimize queries and speed up data access, so it's important to ensure that the partition columns are defined correctly.

You can check the partitioning information of your table by running the DESCRIBE EXTENDED command in Delta Lake. This will give you more information about the table, including the partitioning scheme and the data types of the partition columns.

If the partition column is not properly defined as a timestamp type, you can try altering the table to redefine the partition column as a timestamp. For example, if your partition column is called "date", you can run the following command to redefine it as a timestamp:

ALTER TABLE mytable PARTITIONED BY (date TIMESTAMP)

Once you have verified that the partition column is correctly defined as a timestamp type, you can try running your query again and see if the partition pruning is working as expected.

Stokholm
New Contributor III

Hi Guys, thanks for your advices. I found a solution. We upgrade the Databricks Runtime to 12.2 and now the pushdown of the partitionfilter works. The documentation said that 10.4 would be adequate, but obviously it wasn't enough.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.