Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-02-2023 07:48 AM
Thanks. Since I'm running this in a triggered mode it feels like its reprocessing existing files. I wonder if I'm missing something in my definition of the bronze table? Shouldn't it only process new files (as long as I'm not doing a full refresh)? The existing files are not being changed, it should just read in the new file that was received each day.
Here is my code for Bronze:
@dlt.table(
name=tableNameBronze,
comment = "Raw data ingested from bronze",
table_properties = {
"myCompanyPipeline.quality" : "bronze",
"pipelines.autoOptimize.managed": "true"
}
)
def create_bronze():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.inferColumnTypes","false")
.option("cloudFiles.schemaHints", schemaHintsBronze)
.option("cloudFiles.format", sourceFormat)
.option("cloudFiles.schemaEvolutionMode","rescue")
.option("cloudFiles.rescuedDataColumn","__RescuedData")
.option("pathGlobfilter", fileNamePrefix)
.load(dataPathBronze)
.select(
"*"
,col("_metadata.file_name").alias("__SourceFile")
,current_timestamp().alias("__IngestionDate")
,to_date(substring(col("_metadata.file_name"), -21, 8),'yyyyMMdd').alias("__EffectiveStartDate")
)
)