cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT or DP: How to do full refresh of Delta table from DLT Pipeline to consider all records from Tbl

gudurusreddy99
New Contributor II

Requirement

I have a Kafka streaming pipeline that ingests Pixels data. For each incoming record, I need to validate the Pixels key against an existing Delta table (pixel_tracking_data), which contains over 2 billion records accumulated over the past two years.
This Delta table is continuously appended to by another Databricks process.


Issue

Currently, the solution is only matching against records that are at least 10 minutes old.
The requirement, however, is to match against all up-to-date records in the pixel_tracking_data Delta table with low latency.

Although the current process completes the lookup in 7–8 seconds, it fails to include the most recent data. The underlying Delta table uses liquid clustering (liquid partitioning).

Please suggest any recommendations.

The code being used:

@dlt.view
def lookup_keys():
    return (
        dlt.read_stream("pixels_db_df_dlt")
        .where(F.col('ExecException').isNull())
        .select(
            F.upper(F.col("TrackingTag")).alias("BTag"),
            F.upper(F.col("SubscriberGuid")).alias("SubscriberGuid")
        )
        .distinct()
    )

@dlt.table(name="pixel_subset")
def pixel_subset():
    keys = dlt.read_stream("lookup_keys")

    # # Force Delta metadata refresh so Spark sees new files quickly
    # spark.catalog.refreshTable(f"operational_{env}.silver_data.pixel_tracking_data")

    base = (spark.read.table(f"operational_{env}.silver_data.pixel_tracking_data")
            .select(
                F.col("OperatorId").alias("SubscriberGuid"),
            F.col("BTag"),
            F.col("PublishPointID"),
            F.col("AffiliateID"),
            F.col("extraParameters"),
            F.col("AdID").alias("AdId"),
            F.col("MarketingSourceGroupID").alias("SourceGroupID")
            ))
   
    return  (
        keys.join(base,["SubscriberGuid""BTag"], "inner")
        )


@dlt.table(name="lookup_df")
def lookup_df_combined():
    logger.info(str(dt.now()) + " - Started reading data for lookup_df_combined")
 
    pixels_df = (dlt.read_stream("pixels_db_df_dlt")
            .where(F.col('ExecException').isNull())
            .drop(F.col('ExecException'))
                 
        )
    matched_btags = spark.read.table(f"operational_{env}.silver_data.pixel_subset")
   

    # Step 4: Perform final join (combination of both tables)
    result = (
        pixels_df.alias('curr')
        .join(
            matched_btags.alias('lookup'),
            (F.col("curr.SubscriberGuid"== F.col("lookup.SubscriberGuid")) & (F.col("curr.TrackingTag"== F.col("lookup.BTag")) ,'left'
        )
        .withColumn('retry', F.lit(0))
        .withColumn(
            'resolved',
            F.when(F.col('lookup.BTag').isNotNull(), F.lit(True)).otherwise(False)
        )
        .select(
            F.col('curr.*'),
            F.col('lookup.PublishPointID').alias('PublishPointID'),
            F.col('lookup.AffiliateID').alias('AffiliateID'),
            # F.col('lookup.AdLogActionID').alias('AdLogActionID'),
            F.col('lookup.extraParameters').alias('extraParameters'),
            F.col('lookup.SourceGroupID').alias('SourceGroupID'),
            F.col('lookup.AdID').alias('AdId'),
            F.col('retry'),
            F.col('resolved')
        ).withColumn("commit_timestamp",F.expr("current_timestamp()"))
        .withColumn("loaded_at", F.expr("current_timestamp()"))
    )

    logger.info(str(dt.now()) + " - Finished processing lookup_df_combined")
    return result
1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

Matching streaming data in real time against a massive, fast-changing Delta table requires careful architectural choices. In your case, latency is high for the most recent records, and the solution only matches against data ≥10 minutes old. This is a common issue when Spark “caches” metadata and files, while concurrent appends and streaming queries create a moving target. Here are several recommendations and architectural patterns to help you achieve low-latency, up-to-date lookups:


Root Cause Analysis

  • Metadata Lag: By default, Spark streaming and Delta Lake may not immediately see the latest data appended to a Delta table, especially when frequent streaming jobs and append jobs overlap.

  • Liquid Clustering: Although liquid clustering improves large-table reads, Spark queries can still miss very recent files if metadata has not been refreshed, or runs use stale snapshots.

  • 7–8 Seconds Latency: This suggests the join is scanning significant data—both base and incrementally updated partitions—but still not catching concurrent updates.


Key Recommendations

1. Enable “Auto Optimize” and Frequent Metadata Refresh

  • Enable Delta Lake's Auto Optimize and Optimize Write features, which minimize small files and ensure new writes are rapidly discoverable.

  • Explicitly refresh metadata before each lookup/join:

    python
    spark.read.format("delta").load("...")._jdf.sparkSession().catalog().refreshTable("pixel_tracking_data")
  • For structured streaming, consider:

    python
    spark.conf.set("spark.sql.streaming.fileSource.cleaner.deleteAfterScan.enabled", "true")
  • Drawback: Metadata refresh can increase driver workload for large tables, so time refreshes with your streaming batch intervals.

2. Use Change Data Feed (Delta CDF)

  • Delta CDF makes it possible to stream all new changes to your pixel_tracking_data table. Consume pixel_tracking_data as a stream, union with your incoming pixel data, and perform lookups exclusively on new records.

  • This ensures you are only joining against recent keys, not scanning the whole table repeatedly.

3. Use Streaming “Temporal Join” Pattern

  • Instead of joining your whole streaming input to the entire history, consider a Temporal Table Join:

    • Stream your pixel_tracking_data table using Delta's CDF or by setting up a Spark Structured Streaming read on the Delta table (if changes are frequent and mostly inserts).

    • Maintain a stateful in-memory map (e.g., Spark mapGroupsWithState) of recent keys.

    • When a new pixel event arrives, check this state for the latest known key. This drastically reduces the amount of data scanned per event.

  • Example:

    python
    # stream both sources from delta.tables import DeltaTable pixel_stream = spark.readStream.format("delta").table("pixels_db_df_dlt") delta_stream = spark.readStream.format("delta").table("pixel_tracking_data") # join on-the-fly using mapGroupsWithState or watermarking if needed

4. Reduce Table Scan Scope with Mini-Batches or Cache Recent Keys

  • If 99% of lookups target recent appends, create a “hot cache” of the last N minutes’ worth of keys in memory (possibly by microbatching Delta table ingests).

  • Lookup against this cache, and only for fallbacks scan the entire table or look back further in history.

5. Tune Stream-Table Join Settings

  • Adjust streaming trigger intervals to reduce lag.

  • Move to Trigger.AvailableNow or microbatch for more predictable latency.

  • Use partition pruning and indexing (Z-ordering on join keys) for better join performance.


Newer Databricks/Delta Features to Consider

  • Delta Universal Format scans and Photon execution engine provide further performance boosts—check if your cluster/runtime can use these, as they accelerate metadata scan and join operations.

  • Databricks SQL Materialized Views: Create a fast, lag-free materialized view of just the recent keys (past 15 minutes) to join against your stream, avoiding full table scans.


Example: CDF-Driven Join Pattern

python
# Enable Change Data Feed on your Delta table spark.sql("ALTER TABLE ... SET TBLPROPERTIES (delta.enableChangeDataFeed = true)") # Read the change stream from the table change_feed = spark.readStream.format("delta") .option("readChangeData", "true") .table("pixel_tracking_data") # Join your input stream to the change feed result = pixel_stream.join( change_feed, (pixel_stream.SubscriberGuid == change_feed.SubscriberGuid) & (pixel_stream.TrackingTag == change_feed.BTag), "left" )

Final Tips

  • Schedule Optimize and Vacuum ops off-hours if possible, so they don’t stall streaming snapshots.

  • Monitor streaming UI “input rows per second” and “stateful operator state size” for any increase in lag.

  • For ongoing high-velocity joins, consider adding a low-latency NoSQL (like Redis) as a secondary hot cache for near-real-time keys, while syncing all writes to Delta for durable storage.


Summary Table: Recommendations

Solution Latency Freshness Complexity Cost
Metadata Refresh + AutoOptimize Medium High Medium Medium
Delta CDF + Stream Join Low Highest High Medium
Caching “Hot” Keys Low High Medium Low
NoSQL Hot Cache Lowest Highest High Highest
 
 

Applying Delta CDF streaming joins and/or smart hot key caches, combined with metadata refresh and Z-order, will give you up-to-date, low-latency lookups at scale.