Autoloader, toTable

ShlomoSQM
New Contributor

"In autoloader there is the option ".toTable(catalog.volume.table_name)", I have an autoloder script that reads all the files from a source volume in unity catalog, inside the source I have two different files with two different schemas.
I want to send them to two tables: catalog.volume.table_name1 (for files of type 1) and catalog.volume.table_name2 (for files of type2), how can I specify this in the readStream options or in the writeStream options part?"

 

shan_chandra
Databricks Employee
Databricks Employee

Hi @ShlomoSQM -  Please refer to the below steps and let us know if it works

1) Add a column(with column) for filename during readStream data from autoloader using input_file_name() function

2) split the dataframe (df to df1, df2)  into two based on type1 and type2 (using file name contains type1.. say for example) using schema 1 and schema 2.

3) using merge statement, insert/update the data to the table1 and table2 with the respective dataframes for type1(df1), df2. 

Palash01
Valued Contributor

Hey @ShlomoSQM, looks like @shan_chandra suggested a feasible solution, just to add a little more context this is how you can achieve the same if you have a column that can help you identify what is type1 and type 2

file_type1_stream = readStream.option("cloudFiles.format", "parquet") \
                              .load("/mnt/source_volume") \
                              .filter(col("file_type") == "type1")  # Assuming a column indicating file type

file_type2_stream = readStream.option("cloudFiles.format", "parquet") \
                              .load("/mnt/source_volume") \
                              .filter(col("file_type") == "type2")

file_type1_stream.writeStream.format("delta") \
                            .option("checkpointLocation", "/mnt/checkpoints/type1") \
                            .toTable("catalog.volume.table_name1")

file_type2_stream.writeStream.format("delta") \
                            .option("checkpointLocation", "/mnt/checkpoints/type2") \
                            .toTable("catalog.volume.table_name2")

 

Leave a like if this helps! Kudos,
Palash