- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-14-2024 10:35 PM
So , I am using autoloader in a dlt pipeline for my data ingestion. I am using @Dlt.append_flow because I have data to load from multiple sources.
When I load a new file say x it has 3 rows my target gets 3 rows. But next even if I don't load any file and rerun the dlt pipeline 3 more rows are getting added.
I have used the cloud files config of - includeexistingfiles = False. It is working only when I full refresh my pipelines. For examples now I have 1 file already in my directory and when I full refresh my dlt pipeline the target table get completely empty. bcz full refresh truncates the data and includeexisting files = False is not allowing me to upload new data. ( This is my logic for understand now sure whether correct or not)
But when I upload a new file say y with 4 rows. and run the dlt pipeline 4 new rows get added to my target table with total rows 4. But again when I rerun my pipeline this it again write the files y with 4 news rows in the target table so total 8 rows in my target table with duplicate records. I don't understand why this is happening.
I am using directory listing mode - my data is in my databricks volume which is also an s3 bucket path so I think it will work normally.
my file naming convention is like this - data file path/ xyz_15122024.csv
data file path/ xyz_16122024.csv
this is my code
# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")
# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from pyspark.sql import SparkSession
def apply_model_extensions(input_df,target_catalog,target_schema,table_name):
"""
Apply model extensions to the specified table by dynamically fetching the corresponding extensions
from the Delta table created by the load_model_extensions function.
Parameters:
table_name: str - The name of the table for which extensions are being applied.
Returns:
DataFrame: The updated DataFrame with applied model extensions.
"""
try:
# Log start of operation
log_event(logger, f"Loading model extensions for table '{table_name}' from the Delta table.", "INFO")
# Construct the model extensions table path dynamically
model_extensions_table = f"{target_catalog}.metadata.model_extension_bb123_loans"
# Read the model extensions for the specific table
extension_df = spark.table(model_extensions_table)
# Validate if there are extensions available for the table
if extension_df.count() == 0:
log_event(logger, f"No model extensions found for table '{table_name}'. Skipping.", "INFO")
# return spark.readStream.table(table_name)
return input_df
# Load the target table
# df = spark.readStream.table(f"live.{target_catalog}.{target_schema}.{table_name}")
# Apply each extension to the target table
for row in extension_df.collect():
column_name = row['column_name']
data_type = row['data_type']
# Check if the column already exists to prevent overwriting
if column_name not in input_df.columns:
# Determine the default value based on the data type
if data_type == "timestamp" and "UTC" in column_name:
default_value = from_utc_timestamp(current_timestamp(), "UTC")
elif data_type == "timestamp":
default_value = current_timestamp()
else:
default_value = expr("null")
# Add the column with the default value
input_df = input_df.withColumn(column_name, default_value.cast(data_type))
log_event(logger, f"Added column '{column_name}' of type '{data_type}' to table '{table_name}'.", "INFO")
else:
log_event(logger, f"Column '{column_name}' already exists in table '{table_name}'. Skipping.", "INFO")
log_event(logger, f"Successfully applied {extension_df.count()} model extensions to table '{table_name}'.", "INFO")
return input_df
except Exception as e:
log_error(logger, f"Error applying model extensions to table '{table_name}': {str(e)}")
raise
# Initialize logger and Spark session
logger = get_logger()
log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"
filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")
# Define schema using elements metadata
schema = StructType([
StructField(
row["Entity_Technical_Name"], # Adjusted to match column key
map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
row["Nullable"] == "yes"
)
for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")
header="true"
evolve_schema=True
schema_hints = None
if schema and evolve_schema:
schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
log_event(logger, f"Schema hints: {schema_hints}", "INFO")
# Determine schema evolution mode based on header and evolve_schema
if header == "true":
infer_column_types = "true" # Infer schema from file
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
infer_column_types = "false" # Do not infer schema
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
# Define the cloud file configuration
cloudfile = {
"cloudFiles.format": file_format,
"cloudFiles.allowOverwrites": "false",
"cloudFiles.inferColumnTypes": infer_column_types,
"cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
"cloudFiles.schemaEvolutionMode": schema_evolution_mode,
"cloudFiles.schemaHints": schema_hints,
"cloudFiles.useNotifications": "false",
"cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
"header": header, # Pass the header option directly
"delimiter": "|", # Set pipe as the delimiter
"quote": "\"", # Handle quoted fields
"escape": "\\" # Use backslash as escape character
}
table_type = "Streaming"
@dlt.append_flow(
target = target_table_name,
name = f"{target_table_name}_{file_format}_ingestion",
# name = f"{target_table_name}",
comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
input_df = (spark
.readStream
.format("cloudfiles")
.options(**cloudfile)
.options(**file_specific_config)
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
.load(data_file_path))
# return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")
# Write to Delta
input_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.outputMode("append") \
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
.start(table_path)
return input_df
log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")
Most probably I am doing something wrong with checkpointing / file naming