cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Delta live table not refreshing - window function

ibrar_aslam
New Contributor

We have a list of streaming tables populated by Autoloader from files on S3, which serve as sources for our live tables. After the Autoloader Delta pipeline completes, we trigger a second Delta Live Tables (DLT) pipeline to perform a deduplication operation.

Our current deduplication process computes the rank of the latest record and filters based on this rank. However, we encounter an issue where we need to recreate the schema every time to update the live table, and even a full refresh doesn't resolve the problem.

Could anyone suggest best practices or optimizations for the deduplication process within Delta Live Tables? Additionally, how can we address the need to recreate the schema every time to ensure our live tables are correctly updated?

 

 

 

 

 

 

def generate_live_table(live_table_name: str, table_name: str, id_coll: str, spark: SparkSession) -> None:
    """Create or refresh Delta live table
    Args:
        live_table_name: live DLT name
        full_table_name: base table name
        full_table_schema: table schema
        id_coll: object ID column to partition by
        spark: spark session
    """

    .table(name=live_table_name, table_properties={"delta.columnMapping.mode": "name", "delta.minReaderVersion": "2", "delta.minWriterVersion": "5"})
    def create_or_refresh_live_table():
        return spark.sql(
            f"""
                SELECT * 
                FROM (Select *, rank() OVER (PARTITION BY `{id_coll}` ORDER BY installment_date DESC) as row_rank FROM {table_name})
                WHERE row_rank == 1  
        """
        )

 

 

 

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @ibrar_aslamA simplified explanation of your situation in a flowchart format. Correct me If I am wrong:-

 

Start
|
|--- List of streaming tables populated by Autoloader from S3
|       |
|       |--- Sources for live tables
|               |
|               |--- Autoloader Delta pipeline completes
|                       |
|                       |--- Trigger Delta Live Tables (DLT) pipeline
|                               |
|                               |--- Deduplication operation needed
|                                       |
|                                       |--- Current process: Compute rank of latest record
|                                       |       |
|                                       |       |--- Filter based on rank
|                                       |
|                                       |--- Issue: Need to recreate schema every time
|                                               |
|                                               |--- Even full refresh doesn't resolve problem
|
|--- Need suggestions for deduplication optimizations
|       |
|       |--- Address schema recreation issue for live table updates
|
End

 

The suggestions from my end :-

  1. Instead of computing the rank and filtering, use the MERGE statement in DLT to efficiently de-duplicate the data. The MERGE statement allows you to insert, update, or delete data in a target table based on a condition that compares the target table with the source data.

 

# Example MERGE statement
merge_condition = "source.id = target.id AND source.timestamp > target.timestamp"
merged_df = (
  spark.readStream
    .format("delta")
    .load("s3://your-bucket/your-path")
  .alias("source")
  .merge(
    target_table.alias("target"),
    merge_condition
  )
  .whenMatchedUpdateAll()
  .whenNotMatchedInsertAll()
  .execute()
)

 

2. When using the MERGE statement, partition the data by the columns that define the uniqueness of a record and order by the timestamp column to ensure that the latest record is kept.

3. Similar to deduplicating data, you can use the MERGE statement to update the schema of the live table. This approach ensures that schema changes are applied incrementally without the need to recreate the entire table.

 

# Example MERGE statement for schema updates
merge_condition = "source.name = target.name"
merged_schema = (
  new_schema
  .alias("source")
  .merge(
    target_table.alias("target"),
    merge_condition
  )
  .whenMatchedUpdateColumnsByName(
    columns=["type", "comment"]
  )
  .whenNotMatchedInsertAll()
  .execute()
)

 

4. If you prefer not to use the MERGE statement for schema updates, you can use the CREATE TABLE IF NOT EXISTS statement in your DLT pipeline. This statement creates the table if it doesn't exist and updates the schema if there are any changes.

 

# Example CREATE TABLE IF NOT EXISTS statement
create_table_if_not_exists(
  path="s3://your-bucket/your-path",
  table_name="your_table",
  schema=your_schema
)

 

By implementing these best practices and optimizations, you can improve the deduplication process within Delta Live Tables and efficiently manage schema updates without re-creating the entire table.

To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.

If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.

We appreciate your participation and are here to assist you further if you need it!

For those of you who want to understand what an Autoloader and Streaming Tables are - 

Explaining Autoloader in a way one can understand in one read ๐Ÿ‘ฉโ€๐Ÿ’ป:-

 

Autoloader in Databricks is like a hungry data monster that eagerly gobbles up new files as they land in your cloud storage buffet!

Imagine a big, friendly monster with a voracious appetite sitting at a cloud storage buffet table. Files in various formats like CSV, JSON, and Parquet are being served to it on platters labelled with their file types. The monster happily devours each file as soon as it arrives, processing them efficiently for your data needs!

 

Explaining Streaming Tables :-

 

Streaming tables in Databricks SQL are like magic portals that slurp up data as it flows, turning cloud storage chaos into organized datasets with the wave of a wand! ๐Ÿง™โ€

 

If you have any further questions or need additional guidance, feel free to ask! ๐Ÿ˜Š๐Ÿš€.

Rishabh_Tiwari
Community Manager
Community Manager

Hi @ibrar_aslam ,

Thank you for reaching out to our community! We're here to help you. 

To ensure we provide you with the best support, could you please take a moment to review the response and choose the one that best answers your question? Your feedback not only helps us assist you better but also benefits other community members who may have similar questions in the future.

If you found the answer helpful, consider giving it a kudo. If the response fully addresses your question, please mark it as the accepted solution. This will help us close the thread and ensure your question is resolved.

We appreciate your participation and are here to assist you further if you need it!

Thanks,

Rishabh

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group