cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Pushdown of datetime filter to date partition.

Stokholm
New Contributor III

Hi Everybody,

I have 20 years of data, 600m rows.

I have partitioned them on year and month to generated a files size which seems reasonable.(128Mb)

All data is queried using timestamp, as all queries needs to filter on the exact hours.

So my requirement is that when filtering on timestamp it is only to access the year and month partition equivalent to the timestamp.

This seems to be feasible using generated columns, according to the documentation from both databricks and microsoft.

Table batch reads and writes โ€” Delta Lake Documentation

https://docs.delta.io/latest/delta-batch.html#use-generated-columns

Use Delta Lake generated columns - Azure Databricks | Microsoft Learn

https://learn.microsoft.com/en-us/azure/databricks/delta/generated-columns

Generally the say the same.

DeltaTable.create(spark) \
  .tableName("default.events") \
  .addColumn("eventId", "BIGINT") \
  .addColumn("data", "STRING") \
  .addColumn("eventType", "STRING") \
  .addColumn("eventTime", "TIMESTAMP") \
  .addColumn("year", "INT", generatedAlwaysAs="YEAR(eventTime)") \
  .addColumn("month", "INT", generatedAlwaysAs="MONTH(eventTime)") \
  .addColumn("day", "INT", generatedAlwaysAs="DAY(eventTime)") \
  .partitionedBy("eventType", "year", "month", "day") \
  .execute()
spark.sql('SELECT * FROM default.events WHERE eventTime >= "2020-10-01 00:00:00" <= "2020-10-01 12:00:00"')

"Delta Lake automatically generates a partition filter so that the preceding query only reads the data in partition 

year=2020/month=10/day=01

 even if a partition filter is not specified.

You can use an EXPLAIN clause and check the provided plan to see whether Delta Lake automatically generates any partition filters."

My problem is, that when I use the EXPLAIN command - you can clearly see that it does not use the partitionfilter as intended.

%sql
EXPLAIN Formatted SELECT * FROM default.events2 WHERE eventTime = "2023-01-01 00:00:00"
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet default.events2 (1)
 
 
(1) Scan parquet default.events2
Output [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/events2]
PushedFilters: [IsNotNull(eventTime), EqualTo(eventTime,2023-01-01 00:00:00.0)]
ReadSchema: struct<eventId:bigint,data:string,eventTime:timestamp>
 
(2) ColumnarToRow [codegen id : 1]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
 
(3) Filter [codegen id : 1]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]
Condition : (isnotnull(eventTime#25670472) AND (eventTime#25670472 = 2023-01-01 00:00:00))
 
(4) Project [codegen id : 1]
Output [7]: [eventId#25670469L, data#25670470, eventType#25670471, eventTime#25670472, year#25670473, month#25670474, day#25670475]
Input [7]: [eventId#25670469L, data#25670470, eventTime#25670472, eventType#25670471, year#25670473, month#25670474, day#25670475]

Where as if I include day, you can clearly see a partition filter.

"PartitionFilters: [isnotnull(day#25670689), (day#25670689 = 1)]"

%sql
EXPLAIN Formatted SELECT * FROM default.events2 WHERE eventTime = "2023-01-01 00:00:00" and day = 1
== Physical Plan ==
* Project (4)
+- * Filter (3)
   +- * ColumnarToRow (2)
      +- Scan parquet default.events2 (1)
 
 
(1) Scan parquet default.events2
Output [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
Batched: true
Location: PreparedDeltaFileIndex [dbfs:/user/hive/warehouse/events2]
PartitionFilters: [isnotnull(day#25670689), (day#25670689 = 1)]
PushedFilters: [IsNotNull(eventTime), EqualTo(eventTime,2023-01-01 00:00:00.0)]
ReadSchema: struct<eventId:bigint,data:string,eventTime:timestamp>
 
(2) ColumnarToRow [codegen id : 1]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
 
(3) Filter [codegen id : 1]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]
Condition : (isnotnull(eventTime#25670686) AND (eventTime#25670686 = 2023-01-01 00:00:00))
 
(4) Project [codegen id : 1]
Output [7]: [eventId#25670683L, data#25670684, eventType#25670685, eventTime#25670686, year#25670687, month#25670688, day#25670689]
Input [7]: [eventId#25670683L, data#25670684, eventTime#25670686, eventType#25670685, year#25670687, month#25670688, day#25670689]

We are on version 10.4 - so this should be supported.

Has anyone achieved this? or can you see any issues in in my example?

Best Regards,

Rasmus

9 REPLIES 9

Anonymous
Not applicable

@Rasmus Stokholmโ€‹ :

It seems that you have correctly partitioned your data based on year and month and also defined generated columns for year, month, and day in your Delta table. When you run a query that filters on eventTime column, Delta Lake should automatically generate a partition filter based on the query condition and read only the relevant partitions, as explained in the Delta Lake documentation.

However, you mentioned that when you ran an EXPLAIN command, you did not see the expected partition filter being applied. In one case, you saw a partition filter when filtering on day in addition to eventTime, but not when filtering only on eventTime.

One thing to note is that Delta Lake may choose to read additional partitions if it determines that it would be more efficient to do so, for example, if the partition size is too small or if data skew is detected. You can use the spark.databricks.delta.logLevel configuration property to see more detailed logs and understand why Delta Lake is reading certain partitions.

Another possible reason for not seeing the partition filter being applied is that the query optimizer may have decided to use a different execution plan that does not rely on partition pruning. In this case, you may want to use the HINT syntax to force the optimizer to use a specific execution plan.

Lastly, you can also try using the PARTITIONED BY clause in your CREATE TABLE statement to explicitly define the partition columns and ensure that partition pruning is always used, as this is the preferred way to define partitions in Delta Lake.

Anonymous
Not applicable

Hi @Rasmus Stokholmโ€‹ 

Hope all is well! Just wanted to check in if you were able to resolve your issue and would you be happy to share the solution or mark an answer as best? Else please let us know if you need more help. 

We'd love to hear from you.

Thanks!

Stokholm
New Contributor III

Hi @Vidula Khannaโ€‹ and @Suteja Kanuriโ€‹ 

Thanks you for your suggestions.

But i haven't really progressed.

I set the loglevel to debug. But i'm not quite sure on where to check the outcome.

I didnt find any usable hints, that forces the access plan to use the partitions.

I did try to CREATE TABLE in sql with the PARTITIONED BY-clause directly.

But it still has the same outcome.

I then tried to switch to using the delta lake in azure storage and connecting through synapse via openrowset.

I inserted 1.8 billion rows and confirmed the partitioning took place. Each folder (year and month) has 3-4 files of 10-30 MB.

The sql here shows that when i query the openrowset without year and month and just the timestamp it scans about 14 GB.

When i include the year and month in the where clause it scans. 35MB.

So its not just time lost it also cost a lot of money due to the unnecessary scans.

Anonymous
Not applicable

@Rasmus Stokholmโ€‹ : Please let me know if this helps!

It's good that you set the log level to debug, but it's important to know where the logs are written to so you can check them. Usually, the logs will be written to a file on the server where the query is executed, or they may be sent to a centralized logging system.

Regarding forcing the access plan to use partitions, you can use the

EXPLAIN statement to see the query plan generated by the database engine. This will give you insight into how the engine is processing your query and whether it's using partitioning as expected.

When you created the table in SQL with the PARTITIONED BY clause, did you also specify the partitioning column in the query? For example, if you partitioned the table by year and month, did you include those columns in your WHERE clause when querying the table?

It's good to hear that switching to Delta Lake in Azure Storage and using Synapse via OpenRowset helped with the partitioning issue. It's important to optimize your queries and data storage to avoid unnecessary scans, as you mentioned, to reduce both time and cost.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.