4 weeks ago
I have a Kafka streaming pipeline that ingests Pixels data. For each incoming record, I need to validate the Pixels key against an existing Delta table (pixel_tracking_data), which contains over 2 billion records accumulated over the past two years.
This Delta table is continuously appended to by another Databricks process.
Currently, the solution is only matching against records that are at least 10 minutes old.
The requirement, however, is to match against all up-to-date records in the pixel_tracking_data Delta table with low latency.
Although the current process completes the lookup in 7–8 seconds, it fails to include the most recent data. The underlying Delta table uses liquid clustering (liquid partitioning).
Please suggest any recommendations.
The code being used:
3 weeks ago
Matching streaming data in real time against a massive, fast-changing Delta table requires careful architectural choices. In your case, latency is high for the most recent records, and the solution only matches against data ≥10 minutes old. This is a common issue when Spark “caches” metadata and files, while concurrent appends and streaming queries create a moving target. Here are several recommendations and architectural patterns to help you achieve low-latency, up-to-date lookups:
Metadata Lag: By default, Spark streaming and Delta Lake may not immediately see the latest data appended to a Delta table, especially when frequent streaming jobs and append jobs overlap.
Liquid Clustering: Although liquid clustering improves large-table reads, Spark queries can still miss very recent files if metadata has not been refreshed, or runs use stale snapshots.
7–8 Seconds Latency: This suggests the join is scanning significant data—both base and incrementally updated partitions—but still not catching concurrent updates.
Enable Delta Lake's Auto Optimize and Optimize Write features, which minimize small files and ensure new writes are rapidly discoverable.
Explicitly refresh metadata before each lookup/join:
spark.read.format("delta").load("...")._jdf.sparkSession().catalog().refreshTable("pixel_tracking_data")
For structured streaming, consider:
spark.conf.set("spark.sql.streaming.fileSource.cleaner.deleteAfterScan.enabled", "true")
Drawback: Metadata refresh can increase driver workload for large tables, so time refreshes with your streaming batch intervals.
Delta CDF makes it possible to stream all new changes to your pixel_tracking_data table. Consume pixel_tracking_data as a stream, union with your incoming pixel data, and perform lookups exclusively on new records.
This ensures you are only joining against recent keys, not scanning the whole table repeatedly.
Instead of joining your whole streaming input to the entire history, consider a Temporal Table Join:
Stream your pixel_tracking_data table using Delta's CDF or by setting up a Spark Structured Streaming read on the Delta table (if changes are frequent and mostly inserts).
Maintain a stateful in-memory map (e.g., Spark mapGroupsWithState) of recent keys.
When a new pixel event arrives, check this state for the latest known key. This drastically reduces the amount of data scanned per event.
Example:
# stream both sources
from delta.tables import DeltaTable
pixel_stream = spark.readStream.format("delta").table("pixels_db_df_dlt")
delta_stream = spark.readStream.format("delta").table("pixel_tracking_data")
# join on-the-fly using mapGroupsWithState or watermarking if needed
If 99% of lookups target recent appends, create a “hot cache” of the last N minutes’ worth of keys in memory (possibly by microbatching Delta table ingests).
Lookup against this cache, and only for fallbacks scan the entire table or look back further in history.
Adjust streaming trigger intervals to reduce lag.
Move to Trigger.AvailableNow or microbatch for more predictable latency.
Use partition pruning and indexing (Z-ordering on join keys) for better join performance.
Delta Universal Format scans and Photon execution engine provide further performance boosts—check if your cluster/runtime can use these, as they accelerate metadata scan and join operations.
Databricks SQL Materialized Views: Create a fast, lag-free materialized view of just the recent keys (past 15 minutes) to join against your stream, avoiding full table scans.
# Enable Change Data Feed on your Delta table
spark.sql("ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read the change stream from the table
change_feed = spark.readStream.format("delta")
.option("readChangeData", "true")
.table("pixel_tracking_data")
# Join your input stream to the change feed
result = pixel_stream.join(
change_feed,
(pixel_stream.SubscriberGuid == change_feed.SubscriberGuid) &
(pixel_stream.TrackingTag == change_feed.BTag),
"left"
)
Schedule Optimize and Vacuum ops off-hours if possible, so they don’t stall streaming snapshots.
Monitor streaming UI “input rows per second” and “stateful operator state size” for any increase in lag.
For ongoing high-velocity joins, consider adding a low-latency NoSQL (like Redis) as a secondary hot cache for near-real-time keys, while syncing all writes to Delta for durable storage.
Summary Table: Recommendations
| Solution | Latency | Freshness | Complexity | Cost |
|---|---|---|---|---|
| Metadata Refresh + AutoOptimize | Medium | High | Medium | Medium |
| Delta CDF + Stream Join | Low | Highest | High | Medium |
| Caching “Hot” Keys | Low | High | Medium | Low |
| NoSQL Hot Cache | Lowest | Highest | High | Highest |
Applying Delta CDF streaming joins and/or smart hot key caches, combined with metadata refresh and Z-order, will give you up-to-date, low-latency lookups at scale.
3 weeks ago
Matching streaming data in real time against a massive, fast-changing Delta table requires careful architectural choices. In your case, latency is high for the most recent records, and the solution only matches against data ≥10 minutes old. This is a common issue when Spark “caches” metadata and files, while concurrent appends and streaming queries create a moving target. Here are several recommendations and architectural patterns to help you achieve low-latency, up-to-date lookups:
Metadata Lag: By default, Spark streaming and Delta Lake may not immediately see the latest data appended to a Delta table, especially when frequent streaming jobs and append jobs overlap.
Liquid Clustering: Although liquid clustering improves large-table reads, Spark queries can still miss very recent files if metadata has not been refreshed, or runs use stale snapshots.
7–8 Seconds Latency: This suggests the join is scanning significant data—both base and incrementally updated partitions—but still not catching concurrent updates.
Enable Delta Lake's Auto Optimize and Optimize Write features, which minimize small files and ensure new writes are rapidly discoverable.
Explicitly refresh metadata before each lookup/join:
spark.read.format("delta").load("...")._jdf.sparkSession().catalog().refreshTable("pixel_tracking_data")
For structured streaming, consider:
spark.conf.set("spark.sql.streaming.fileSource.cleaner.deleteAfterScan.enabled", "true")
Drawback: Metadata refresh can increase driver workload for large tables, so time refreshes with your streaming batch intervals.
Delta CDF makes it possible to stream all new changes to your pixel_tracking_data table. Consume pixel_tracking_data as a stream, union with your incoming pixel data, and perform lookups exclusively on new records.
This ensures you are only joining against recent keys, not scanning the whole table repeatedly.
Instead of joining your whole streaming input to the entire history, consider a Temporal Table Join:
Stream your pixel_tracking_data table using Delta's CDF or by setting up a Spark Structured Streaming read on the Delta table (if changes are frequent and mostly inserts).
Maintain a stateful in-memory map (e.g., Spark mapGroupsWithState) of recent keys.
When a new pixel event arrives, check this state for the latest known key. This drastically reduces the amount of data scanned per event.
Example:
# stream both sources
from delta.tables import DeltaTable
pixel_stream = spark.readStream.format("delta").table("pixels_db_df_dlt")
delta_stream = spark.readStream.format("delta").table("pixel_tracking_data")
# join on-the-fly using mapGroupsWithState or watermarking if needed
If 99% of lookups target recent appends, create a “hot cache” of the last N minutes’ worth of keys in memory (possibly by microbatching Delta table ingests).
Lookup against this cache, and only for fallbacks scan the entire table or look back further in history.
Adjust streaming trigger intervals to reduce lag.
Move to Trigger.AvailableNow or microbatch for more predictable latency.
Use partition pruning and indexing (Z-ordering on join keys) for better join performance.
Delta Universal Format scans and Photon execution engine provide further performance boosts—check if your cluster/runtime can use these, as they accelerate metadata scan and join operations.
Databricks SQL Materialized Views: Create a fast, lag-free materialized view of just the recent keys (past 15 minutes) to join against your stream, avoiding full table scans.
# Enable Change Data Feed on your Delta table
spark.sql("ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)")
# Read the change stream from the table
change_feed = spark.readStream.format("delta")
.option("readChangeData", "true")
.table("pixel_tracking_data")
# Join your input stream to the change feed
result = pixel_stream.join(
change_feed,
(pixel_stream.SubscriberGuid == change_feed.SubscriberGuid) &
(pixel_stream.TrackingTag == change_feed.BTag),
"left"
)
Schedule Optimize and Vacuum ops off-hours if possible, so they don’t stall streaming snapshots.
Monitor streaming UI “input rows per second” and “stateful operator state size” for any increase in lag.
For ongoing high-velocity joins, consider adding a low-latency NoSQL (like Redis) as a secondary hot cache for near-real-time keys, while syncing all writes to Delta for durable storage.
Summary Table: Recommendations
| Solution | Latency | Freshness | Complexity | Cost |
|---|---|---|---|---|
| Metadata Refresh + AutoOptimize | Medium | High | Medium | Medium |
| Delta CDF + Stream Join | Low | Highest | High | Medium |
| Caching “Hot” Keys | Low | High | Medium | Low |
| NoSQL Hot Cache | Lowest | Highest | High | Highest |
Applying Delta CDF streaming joins and/or smart hot key caches, combined with metadata refresh and Z-order, will give you up-to-date, low-latency lookups at scale.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now