Saturday
So , I am using autoloader in a dlt pipeline for my data ingestion. I am using @Dlt.append_flow because I have data to load from multiple sources.
When I load a new file say x it has 3 rows my target gets 3 rows. But next even if I don't load any file and rerun the dlt pipeline 3 more rows are getting added.
I have used the cloud files config of - includeexistingfiles = False. It is working only when I full refresh my pipelines. For examples now I have 1 file already in my directory and when I full refresh my dlt pipeline the target table get completely empty. bcz full refresh truncates the data and includeexisting files = False is not allowing me to upload new data. ( This is my logic for understand now sure whether correct or not)
But when I upload a new file say y with 4 rows. and run the dlt pipeline 4 new rows get added to my target table with total rows 4. But again when I rerun my pipeline this it again write the files y with 4 news rows in the target table so total 8 rows in my target table with duplicate records. I don't understand why this is happening.
I am using directory listing mode - my data is in my databricks volume which is also an s3 bucket path so I think it will work normally.
my file naming convention is like this - data file path/ xyz_15122024.csv
data file path/ xyz_16122024.csv
this is my code
# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")
# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from pyspark.sql import SparkSession
def apply_model_extensions(input_df,target_catalog,target_schema,table_name):
"""
Apply model extensions to the specified table by dynamically fetching the corresponding extensions
from the Delta table created by the load_model_extensions function.
Parameters:
table_name: str - The name of the table for which extensions are being applied.
Returns:
DataFrame: The updated DataFrame with applied model extensions.
"""
try:
# Log start of operation
log_event(logger, f"Loading model extensions for table '{table_name}' from the Delta table.", "INFO")
# Construct the model extensions table path dynamically
model_extensions_table = f"{target_catalog}.metadata.model_extension_bb123_loans"
# Read the model extensions for the specific table
extension_df = spark.table(model_extensions_table)
# Validate if there are extensions available for the table
if extension_df.count() == 0:
log_event(logger, f"No model extensions found for table '{table_name}'. Skipping.", "INFO")
# return spark.readStream.table(table_name)
return input_df
# Load the target table
# df = spark.readStream.table(f"live.{target_catalog}.{target_schema}.{table_name}")
# Apply each extension to the target table
for row in extension_df.collect():
column_name = row['column_name']
data_type = row['data_type']
# Check if the column already exists to prevent overwriting
if column_name not in input_df.columns:
# Determine the default value based on the data type
if data_type == "timestamp" and "UTC" in column_name:
default_value = from_utc_timestamp(current_timestamp(), "UTC")
elif data_type == "timestamp":
default_value = current_timestamp()
else:
default_value = expr("null")
# Add the column with the default value
input_df = input_df.withColumn(column_name, default_value.cast(data_type))
log_event(logger, f"Added column '{column_name}' of type '{data_type}' to table '{table_name}'.", "INFO")
else:
log_event(logger, f"Column '{column_name}' already exists in table '{table_name}'. Skipping.", "INFO")
log_event(logger, f"Successfully applied {extension_df.count()} model extensions to table '{table_name}'.", "INFO")
return input_df
except Exception as e:
log_error(logger, f"Error applying model extensions to table '{table_name}': {str(e)}")
raise
# Initialize logger and Spark session
logger = get_logger()
log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"
filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")
# Define schema using elements metadata
schema = StructType([
StructField(
row["Entity_Technical_Name"], # Adjusted to match column key
map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
row["Nullable"] == "yes"
)
for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")
header="true"
evolve_schema=True
schema_hints = None
if schema and evolve_schema:
schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
log_event(logger, f"Schema hints: {schema_hints}", "INFO")
# Determine schema evolution mode based on header and evolve_schema
if header == "true":
infer_column_types = "true" # Infer schema from file
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
infer_column_types = "false" # Do not infer schema
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
# Define the cloud file configuration
cloudfile = {
"cloudFiles.format": file_format,
"cloudFiles.allowOverwrites": "false",
"cloudFiles.inferColumnTypes": infer_column_types,
"cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
"cloudFiles.schemaEvolutionMode": schema_evolution_mode,
"cloudFiles.schemaHints": schema_hints,
"cloudFiles.useNotifications": "false",
"cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
"header": header, # Pass the header option directly
"delimiter": "|", # Set pipe as the delimiter
"quote": "\"", # Handle quoted fields
"escape": "\\" # Use backslash as escape character
}
table_type = "Streaming"
@dlt.append_flow(
target = target_table_name,
name = f"{target_table_name}_{file_format}_ingestion",
# name = f"{target_table_name}",
comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
input_df = (spark
.readStream
.format("cloudfiles")
.options(**cloudfile)
.options(**file_specific_config)
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
.load(data_file_path))
# return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")
# Write to Delta
input_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.outputMode("append") \
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
.start(table_path)
return input_df
log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")
Most probably I am doing something wrong with checkpointing / file naming
Sunday
Hi @ashraf1395,
Just a few comments about your question:
The cloudFiles source in Databricks is designed for incremental file processing. However, it depends on the checkpoint directory to track which files have been processed.
The cloudFiles.includeExistingFiles option determines whether to include existing files in the stream processing input path or to only process new files arriving after the initial setup. This option is evaluated only when you start a stream for the first time. includeExistingFiles=False: This configuration prevents files already present in the directory from being processed during the first run of the pipeline. It doesnโt stop those files from being reprocessed if the checkpoint directory is reset.
Since youโre using outputMode("append"), every processed record is appended to the target table. Without deduplication, duplicates from previously processed files will accumulate.
To avoid the duplicates you might want to implement deduplication on the target table.
Sunday
Hi @ashraf1395,
Just a few comments about your question:
The cloudFiles source in Databricks is designed for incremental file processing. However, it depends on the checkpoint directory to track which files have been processed.
The cloudFiles.includeExistingFiles option determines whether to include existing files in the stream processing input path or to only process new files arriving after the initial setup. This option is evaluated only when you start a stream for the first time. includeExistingFiles=False: This configuration prevents files already present in the directory from being processed during the first run of the pipeline. It doesnโt stop those files from being reprocessed if the checkpoint directory is reset.
Since youโre using outputMode("append"), every processed record is appended to the target table. Without deduplication, duplicates from previously processed files will accumulate.
To avoid the duplicates you might want to implement deduplication on the target table.
Sunday
Got it. I was sensing that Alberto, but I was not sure. Thank you so much.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group