cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Discrepancy in Record Count in DLT Pipeline Data Quality Tab

YS1
Contributor

Hello,

I have set up a DLT pipeline where I ingest data from a Kafka topic into a table. Then, I create another table that filters records from the first table. However, I'm facing an issue:

  • When I check the Data Quality tab for the second table, it shows a large number of written records, which is bigger than the number of records in the first table.
  • But, when I query the second table, the count matches the expected number of records.

I'm trying to understand why the DLT pipeline shows a different number than what is actually written to the table.

Below is a screenshot of the pipeline structure and the code for constructing the second table:

YS1_0-1721938771644.png

 

 

 

.table(
  name="bronze_eventlog_inventory_management",
    table_properties={"quality": "bronze",
                    "pipelines.autoOptimize.managed": "true",                    "pipelines.reset.allowed": "true"},
  temporary=False)

def create_bronze_table():

  
    master_keys_list = ["column_a", "column_b", "column_c", "column_d", "column_e", "column_f", "column_g"]

    event_ids_list = ["1", "2", "3", "4", "5", "6", "7", "8"]

    df_src=(dlt
              .read_stream("bronze_eventlog")
              .filter(col("id").isin(event_ids_list))
              .withColumn("parsed_data", from_json(col("message_content"), "map<string,string>"))
              .select("x", "y", "z", "id", explode(col("parsed_data")).alias("key", "value"))
              .withColumn("key", when(lower(col("key")) == "column_a", lit("column_a"))
                                .otherwise(col("key")))
             )
    
    df_inv_man = (df_src
                  .groupby("x", "y", "z", "id")
                  .agg(*[first(when(col("key") == k, col("value")), ignorenulls=True).alias(k) for k in master_keys_list]))

    return (df_inv_man)

 

 

 

Can anyone help me understand the discrepancy in the record count between the Data Quality tab and the actual table?

Thanks!

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @YS1DLT applies data quality checks during the transformation process. If your second table is filtering records based on certain criteria (e.g., removing duplicates or invalid records), it may still show a higher count in the Data Quality tab due to the way it tracks the incoming records before filtering. The Data Quality tab reflects the total records processed, which includes those that were filtered out in the final output.

The streaming nature of your pipeline could also contribute to this discrepancy. In streaming scenarios, records may be ingested continuously, and the Data Quality tab may count records as they arrive, regardless of whether they are ultimately included in the final table. This means that transient states of the data might be reflected in the Data Quality metrics, leading to a higher count than expected.

DLT uses checkpointing to manage state across streaming operations. If there are delays or issues in processing, the Data Quality tab might reflect a backlog of records that have been ingested but not yet processed or filtered. This can lead to temporary discrepancies until the system catches up.

  1. Use Spark's streaming metrics to monitor the ingestion and processing rates. This can help you understand how many records are being processed in real-time versus how many are being filtered out.
  2. Ensure that the filtering logic in your second table is correctly implemented and that it aligns with your expectations. Any misconfiguration could lead to unexpected results in the Data Quality metrics.
  3. If your pipeline allows for duplicate records in the first table, consider implementing deduplication logic in your transformations to ensure that the final count matches your expectations.

By addressing these areas, you should be able to clarify the discrepancies in the Data Quality tab and ensure that your DLT pipeline operates as expected.

Thank you @Kaniz_Fatma. This makes sense. But can you elaborate more on how to use spark's streaming metrics? Where they can be found or are there any articles that explains how it works? Thanks!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group