Hi Community,
I'm trying to load data from the landing zone to the bronze layer via DLT- Autoloader, I want to add a column record_id to the bronze table while I fetch my data. I'm also using file arrival trigger in the workflow to update my table incrementally.
I've followed a post in the community forum using monotonically_increasing_id() but it's not supported in the streaming use case. I also do not have any event-time based column to enable watermarking.
Attaching code snippet for reference:
def create_bronze_tables(
source_file_path=None,
target_table_name=None,
primary_keys=None
):
@Dlt.table(name=f"{target_table_name}")
def create_sample_tables():
table = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.option("inferColumnTypes", True)
.option('header','true')
.option("delimiter", "\t")
.option("mergeSchema", "true")
.load(f"{source_file_path}")
.withColumn("record_id", row_number().over(
Window.orderBy(monotonically_increasing_id())))
)
return table
the error I face is attached.
Any help in how to create a record-id in this setting is appreciated.
Riz