Autoloader, toTable
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2024 06:26 AM
"In autoloader there is the option ".toTable(catalog.volume.table_name)", I have an autoloder script that reads all the files from a source volume in unity catalog, inside the source I have two different files with two different schemas.
I want to send them to two tables: catalog.volume.table_name1 (for files of type 1) and catalog.volume.table_name2 (for files of type2), how can I specify this in the readStream options or in the writeStream options part?"
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2024 08:08 AM - edited 01-23-2024 08:09 AM
Hi @ShlomoSQM - Please refer to the below steps and let us know if it works
1) Add a column(with column) for filename during readStream data from autoloader using input_file_name() function
2) split the dataframe (df to df1, df2) into two based on type1 and type2 (using file name contains type1.. say for example) using schema 1 and schema 2.
3) using merge statement, insert/update the data to the table1 and table2 with the respective dataframes for type1(df1), df2.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-23-2024 10:31 PM
Hey @ShlomoSQM, looks like @shan_chandra suggested a feasible solution, just to add a little more context this is how you can achieve the same if you have a column that can help you identify what is type1 and type 2
file_type1_stream = readStream.option("cloudFiles.format", "parquet") \
.load("/mnt/source_volume") \
.filter(col("file_type") == "type1") # Assuming a column indicating file type
file_type2_stream = readStream.option("cloudFiles.format", "parquet") \
.load("/mnt/source_volume") \
.filter(col("file_type") == "type2")
file_type1_stream.writeStream.format("delta") \
.option("checkpointLocation", "/mnt/checkpoints/type1") \
.toTable("catalog.volume.table_name1")
file_type2_stream.writeStream.format("delta") \
.option("checkpointLocation", "/mnt/checkpoints/type2") \
.toTable("catalog.volume.table_name2")
Palash