Hi Community,
I'm trying to load data from the landing zone to the bronze layer via DLT- Autoloader, I want to add a column record_id to the bronze table while I fetch my data. I'm also using file arrival trigger in the workflow to update my table incrementally.
I've followed a post in the community forum using monotonically_increasing_id() but it's not supported in the streaming use case. I also do not have any event-time based column to enable watermarking.
Attaching code snippet for reference:
 
def create_bronze_tables(
    source_file_path=None,
    target_table_name=None,
    primary_keys=None
):
    @Dlt.table(name=f"{target_table_name}")
    def create_sample_tables():
        table = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("inferColumnTypes", True)
            .option('header','true')
            .option("delimiter", "\t")
            .option("mergeSchema", "true")
            .load(f"{source_file_path}")
            .withColumn("record_id", row_number().over(
                Window.orderBy(monotonically_increasing_id())))
        )
        return table
 
the error I face is attached.
Any help in how to create a record-id in this setting is appreciated.
					
				
			
			
				
	Riz