We have a list of streaming tables populated by Autoloader from files on S3, which serve as sources for our live tables. After the Autoloader Delta pipeline completes, we trigger a second Delta Live Tables (DLT) pipeline to perform a deduplication operation.
Our current deduplication process computes the rank of the latest record and filters based on this rank. However, we encounter an issue where we need to recreate the schema every time to update the live table, and even a full refresh doesn't resolve the problem.
Could anyone suggest best practices or optimizations for the deduplication process within Delta Live Tables? Additionally, how can we address the need to recreate the schema every time to ensure our live tables are correctly updated?
def generate_live_table(live_table_name: str, table_name: str, id_coll: str, spark: SparkSession) -> None:
"""Create or refresh Delta live table
Args:
live_table_name: live DLT name
full_table_name: base table name
full_table_schema: table schema
id_coll: object ID column to partition by
spark: spark session
"""
.table(name=live_table_name, table_properties={"delta.columnMapping.mode": "name", "delta.minReaderVersion": "2", "delta.minWriterVersion": "5"})
def create_or_refresh_live_table():
return spark.sql(
f"""
SELECT *
FROM (Select *, rank() OVER (PARTITION BY `{id_coll}` ORDER BY installment_date DESC) as row_rank FROM {table_name})
WHERE row_rank == 1
"""
)