Hello,
I have set up a DLT pipeline where I ingest data from a Kafka topic into a table. Then, I create another table that filters records from the first table. However, I'm facing an issue:
- When I check the Data Quality tab for the second table, it shows a large number of written records, which is bigger than the number of records in the first table.
- But, when I query the second table, the count matches the expected number of records.
I'm trying to understand why the DLT pipeline shows a different number than what is actually written to the table.
Below is a screenshot of the pipeline structure and the code for constructing the second table:
.table(
name="bronze_eventlog_inventory_management",
table_properties={"quality": "bronze",
"pipelines.autoOptimize.managed": "true", "pipelines.reset.allowed": "true"},
temporary=False)
def create_bronze_table():
master_keys_list = ["column_a", "column_b", "column_c", "column_d", "column_e", "column_f", "column_g"]
event_ids_list = ["1", "2", "3", "4", "5", "6", "7", "8"]
df_src=(dlt
.read_stream("bronze_eventlog")
.filter(col("id").isin(event_ids_list))
.withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
.select("x", "y", "z", "id", explode(col("parsed_data")).alias("key", "value"))
.withColumn("key", when(lower(col("key")) == "column_a", lit("column_a"))
.otherwise(col("key")))
)
df_inv_man = (df_src
.groupby("x", "y", "z", "id")
.agg(*[first(when(col("key") == k, col("value")), ignorenulls=True).alias(k) for k in master_keys_list]))
return (df_inv_man)
Can anyone help me understand the discrepancy in the record count between the Data Quality tab and the actual table?
Thanks!