cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to create a record_id column via DLT - Autoloader

RiyazAli
Contributor III

Hi Community,

I'm trying to load data from the landing zone to the bronze layer via DLT- Autoloader, I want to add a column record_id to the bronze table while I fetch my data. I'm also using file arrival trigger in the workflow to update my table incrementally.

I've followed a post in the community forum using monotonically_increasing_id() but it's not supported in the streaming use case. I also do not have any event-time based column to enable watermarking.

Attaching code snippet for reference:

 

def create_bronze_tables(
    source_file_path=None,
    target_table_name=None,
    primary_keys=None
):
    @Dlt.table(name=f"{target_table_name}")
    def create_sample_tables():
        table = (
            spark.readStream.format("cloudFiles")
            .option("cloudFiles.format", "csv")
            .option("inferColumnTypes", True)
            .option('header','true')
            .option("delimiter", "\t")
            .option("mergeSchema", "true")
            .load(f"{source_file_path}")
            .withColumn("record_id", row_number().over(
                Window.orderBy(monotonically_increasing_id())))
        )
        return table

 

the error I face is attached.

Any help in how to create a record-id in this setting is appreciated.

1 REPLY 1

RiyazAli
Contributor III

Hey @Kaniz  - could you or any body from the community team help me here, please? I've been stuck since quite some time now.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.