cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Old files also getting added in dlt autoloader

ashraf1395
Valued Contributor

So , I am using autoloader in a dlt pipeline for my data ingestion. I am using @Dlt.append_flow because I have data to load from multiple sources.

When I load a new file say x it has 3 rows my target gets 3 rows. But next even if I don't load any file and rerun the dlt pipeline 3 more rows are getting added. 
I have used the cloud files config of - includeexistingfiles = False. It is working only when I full refresh my pipelines. For examples now I have 1 file already in my directory and when I full refresh my dlt pipeline the target table get completely empty. bcz full refresh truncates the data and includeexisting files = False is not allowing me to upload new data. ( This is my logic for understand now sure whether correct or not)

But when I upload a new file say y with 4 rows. and run the dlt pipeline 4 new rows get added to my target table with total rows 4. But again when I rerun my pipeline this it again write the files y with 4 news rows in the target table so total 8 rows in my target table with duplicate records. I don't understand why this is happening.

I am using directory listing mode - my data is in my databricks volume which is also an s3 bucket path so I think it will work normally.

my file naming convention is like this - data file path/ xyz_15122024.csv 

data file path/ xyz_16122024.csv

this is my code 

 

# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")

# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from pyspark.sql import SparkSession

def apply_model_extensions(input_df,target_catalog,target_schema,table_name):
    """
    Apply model extensions to the specified table by dynamically fetching the corresponding extensions 
    from the Delta table created by the load_model_extensions function.

    Parameters:
    table_name: str - The name of the table for which extensions are being applied.
    
    Returns:
    DataFrame: The updated DataFrame with applied model extensions.
    """
    try:
        # Log start of operation
        log_event(logger, f"Loading model extensions for table '{table_name}' from the Delta table.", "INFO")

        # Construct the model extensions table path dynamically
        model_extensions_table = f"{target_catalog}.metadata.model_extension_bb123_loans"

        # Read the model extensions for the specific table
        extension_df = spark.table(model_extensions_table)

        # Validate if there are extensions available for the table
        if extension_df.count() == 0:
            log_event(logger, f"No model extensions found for table '{table_name}'. Skipping.", "INFO")
            # return spark.readStream.table(table_name)
            return input_df

        # Load the target table
        # df = spark.readStream.table(f"live.{target_catalog}.{target_schema}.{table_name}")

        # Apply each extension to the target table
        for row in extension_df.collect():
            column_name = row['column_name']
            data_type = row['data_type']

            # Check if the column already exists to prevent overwriting
            if column_name not in input_df.columns:
                # Determine the default value based on the data type
                if data_type == "timestamp" and "UTC" in column_name:
                    default_value = from_utc_timestamp(current_timestamp(), "UTC")
                elif data_type == "timestamp":
                    default_value = current_timestamp()
                else:
                    default_value = expr("null")

                # Add the column with the default value
                input_df = input_df.withColumn(column_name, default_value.cast(data_type))
                log_event(logger, f"Added column '{column_name}' of type '{data_type}' to table '{table_name}'.", "INFO")
            else:
                log_event(logger, f"Column '{column_name}' already exists in table '{table_name}'. Skipping.", "INFO")

        log_event(logger, f"Successfully applied {extension_df.count()} model extensions to table '{table_name}'.", "INFO")
        return input_df

    except Exception as e:
        log_error(logger, f"Error applying model extensions to table '{table_name}': {str(e)}")
        raise

# Initialize logger and Spark session
logger = get_logger()


log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"

filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")

    # Define schema using elements metadata
schema = StructType([
        StructField(
            row["Entity_Technical_Name"],  # Adjusted to match column key
            map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
            row["Nullable"] == "yes"
        )
        for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")

header="true"
evolve_schema=True
schema_hints = None

if schema and evolve_schema:
    schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
  
log_event(logger, f"Schema hints: {schema_hints}", "INFO")

# Determine schema evolution mode based on header and evolve_schema

if header == "true":
    infer_column_types = "true"  # Infer schema from file
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
    infer_column_types = "false"  # Do not infer schema
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"

# Define the cloud file configuration
cloudfile = {
    "cloudFiles.format": file_format,
    "cloudFiles.allowOverwrites": "false",
    "cloudFiles.inferColumnTypes": infer_column_types,
    "cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
    "cloudFiles.schemaEvolutionMode": schema_evolution_mode,
    "cloudFiles.schemaHints": schema_hints,
    "cloudFiles.useNotifications": "false",
    "cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
    "header": header,  # Pass the header option directly
    "delimiter": "|",  # Set pipe as the delimiter
    "quote": "\"",     # Handle quoted fields
    "escape": "\\"     # Use backslash as escape character
}

table_type = "Streaming"
@dlt.append_flow(
    target = target_table_name,
    name = f"{target_table_name}_{file_format}_ingestion",
    # name = f"{target_table_name}",
    comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
    input_df = (spark
          .readStream
          .format("cloudfiles")
          .options(**cloudfile)
          .options(**file_specific_config)
          .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
          .load(data_file_path))
    
    # return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
    input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
    log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")

    # Write to Delta
    input_df.writeStream \
        .format("delta") \
        .option("mergeSchema", "true") \
        .outputMode("append") \
        .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
        .start(table_path)

    return input_df


log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")

 

Most probably I am doing something wrong with checkpointing / file naming 

1 ACCEPTED SOLUTION

Accepted Solutions

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @ashraf1395,

Just a few comments about your question:

The cloudFiles source in Databricks is designed for incremental file processing. However, it depends on the checkpoint directory to track which files have been processed.

The cloudFiles.includeExistingFiles option determines whether to include existing files in the stream processing input path or to only process new files arriving after the initial setup. This option is evaluated only when you start a stream for the first time. includeExistingFiles=False: This configuration prevents files already present in the directory from being processed during the first run of the pipeline. It doesn’t stop those files from being reprocessed if the checkpoint directory is reset.

Since you’re using outputMode("append"), every processed record is appended to the target table. Without deduplication, duplicates from previously processed files will accumulate.

To avoid the duplicates you might want to implement deduplication on the target table.

View solution in original post

2 REPLIES 2

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @ashraf1395,

Just a few comments about your question:

The cloudFiles source in Databricks is designed for incremental file processing. However, it depends on the checkpoint directory to track which files have been processed.

The cloudFiles.includeExistingFiles option determines whether to include existing files in the stream processing input path or to only process new files arriving after the initial setup. This option is evaluated only when you start a stream for the first time. includeExistingFiles=False: This configuration prevents files already present in the directory from being processed during the first run of the pipeline. It doesn’t stop those files from being reprocessed if the checkpoint directory is reset.

Since you’re using outputMode("append"), every processed record is appended to the target table. Without deduplication, duplicates from previously processed files will accumulate.

To avoid the duplicates you might want to implement deduplication on the target table.

Got it. I was sensing that Alberto, but I was not sure. Thank you so much.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group