js54123875
New Contributor III

Thanks. Since I'm running this in a triggered mode it feels like its reprocessing existing files. I wonder if I'm missing something in my definition of the bronze table? Shouldn't it only process new files (as long as I'm not doing a full refresh)? The existing files are not being changed, it should just read in the new file that was received each day.

Here is my code for Bronze:

@dlt.table(
      name=tableNameBronze,
      comment = "Raw data ingested from bronze",
      table_properties = {
        "myCompanyPipeline.quality" : "bronze",
        "pipelines.autoOptimize.managed": "true"
      }
    )
    def create_bronze():
      return (
        spark.readStream
          .format("cloudFiles")
          .option("cloudFiles.inferColumnTypes","false")
          .option("cloudFiles.schemaHints", schemaHintsBronze)
          .option("cloudFiles.format", sourceFormat)
          .option("cloudFiles.schemaEvolutionMode","rescue")
          .option("cloudFiles.rescuedDataColumn","__RescuedData")
          .option("pathGlobfilter", fileNamePrefix) 
          .load(dataPathBronze)
          .select(
            "*"
            ,col("_metadata.file_name").alias("__SourceFile")
            ,current_timestamp().alias("__IngestionDate")
            ,to_date(substring(col("_metadata.file_name"), -21, 8),'yyyyMMdd').alias("__EffectiveStartDate")
          )
        )