Old files also getting added in dlt autoloader

ashraf1395
Honored Contributor

So , I am using autoloader in a dlt pipeline for my data ingestion. I am using @Dlt.append_flow because I have data to load from multiple sources.

When I load a new file say x it has 3 rows my target gets 3 rows. But next even if I don't load any file and rerun the dlt pipeline 3 more rows are getting added. 
I have used the cloud files config of - includeexistingfiles = False. It is working only when I full refresh my pipelines. For examples now I have 1 file already in my directory and when I full refresh my dlt pipeline the target table get completely empty. bcz full refresh truncates the data and includeexisting files = False is not allowing me to upload new data. ( This is my logic for understand now sure whether correct or not)

But when I upload a new file say y with 4 rows. and run the dlt pipeline 4 new rows get added to my target table with total rows 4. But again when I rerun my pipeline this it again write the files y with 4 news rows in the target table so total 8 rows in my target table with duplicate records. I don't understand why this is happening.

I am using directory listing mode - my data is in my databricks volume which is also an s3 bucket path so I think it will work normally.

my file naming convention is like this - data file path/ xyz_15122024.csv 

data file path/ xyz_16122024.csv

this is my code 

 

# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")

# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from pyspark.sql import SparkSession

def apply_model_extensions(input_df,target_catalog,target_schema,table_name):
    """
    Apply model extensions to the specified table by dynamically fetching the corresponding extensions 
    from the Delta table created by the load_model_extensions function.

    Parameters:
    table_name: str - The name of the table for which extensions are being applied.
    
    Returns:
    DataFrame: The updated DataFrame with applied model extensions.
    """
    try:
        # Log start of operation
        log_event(logger, f"Loading model extensions for table '{table_name}' from the Delta table.", "INFO")

        # Construct the model extensions table path dynamically
        model_extensions_table = f"{target_catalog}.metadata.model_extension_bb123_loans"

        # Read the model extensions for the specific table
        extension_df = spark.table(model_extensions_table)

        # Validate if there are extensions available for the table
        if extension_df.count() == 0:
            log_event(logger, f"No model extensions found for table '{table_name}'. Skipping.", "INFO")
            # return spark.readStream.table(table_name)
            return input_df

        # Load the target table
        # df = spark.readStream.table(f"live.{target_catalog}.{target_schema}.{table_name}")

        # Apply each extension to the target table
        for row in extension_df.collect():
            column_name = row['column_name']
            data_type = row['data_type']

            # Check if the column already exists to prevent overwriting
            if column_name not in input_df.columns:
                # Determine the default value based on the data type
                if data_type == "timestamp" and "UTC" in column_name:
                    default_value = from_utc_timestamp(current_timestamp(), "UTC")
                elif data_type == "timestamp":
                    default_value = current_timestamp()
                else:
                    default_value = expr("null")

                # Add the column with the default value
                input_df = input_df.withColumn(column_name, default_value.cast(data_type))
                log_event(logger, f"Added column '{column_name}' of type '{data_type}' to table '{table_name}'.", "INFO")
            else:
                log_event(logger, f"Column '{column_name}' already exists in table '{table_name}'. Skipping.", "INFO")

        log_event(logger, f"Successfully applied {extension_df.count()} model extensions to table '{table_name}'.", "INFO")
        return input_df

    except Exception as e:
        log_error(logger, f"Error applying model extensions to table '{table_name}': {str(e)}")
        raise

# Initialize logger and Spark session
logger = get_logger()


log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"

filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")

    # Define schema using elements metadata
schema = StructType([
        StructField(
            row["Entity_Technical_Name"],  # Adjusted to match column key
            map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
            row["Nullable"] == "yes"
        )
        for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")

header="true"
evolve_schema=True
schema_hints = None

if schema and evolve_schema:
    schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
  
log_event(logger, f"Schema hints: {schema_hints}", "INFO")

# Determine schema evolution mode based on header and evolve_schema

if header == "true":
    infer_column_types = "true"  # Infer schema from file
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
    infer_column_types = "false"  # Do not infer schema
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"

# Define the cloud file configuration
cloudfile = {
    "cloudFiles.format": file_format,
    "cloudFiles.allowOverwrites": "false",
    "cloudFiles.inferColumnTypes": infer_column_types,
    "cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
    "cloudFiles.schemaEvolutionMode": schema_evolution_mode,
    "cloudFiles.schemaHints": schema_hints,
    "cloudFiles.useNotifications": "false",
    "cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
    "header": header,  # Pass the header option directly
    "delimiter": "|",  # Set pipe as the delimiter
    "quote": "\"",     # Handle quoted fields
    "escape": "\\"     # Use backslash as escape character
}

table_type = "Streaming"
@dlt.append_flow(
    target = target_table_name,
    name = f"{target_table_name}_{file_format}_ingestion",
    # name = f"{target_table_name}",
    comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
    input_df = (spark
          .readStream
          .format("cloudfiles")
          .options(**cloudfile)
          .options(**file_specific_config)
          .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
          .load(data_file_path))
    
    # return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
    input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
    log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")

    # Write to Delta
    input_df.writeStream \
        .format("delta") \
        .option("mergeSchema", "true") \
        .outputMode("append") \
        .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
        .start(table_path)

    return input_df


log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")

 

Most probably I am doing something wrong with checkpointing / file naming