cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Autoloader, toTable

ShlomoSQM
New Contributor

"In autoloader there is the option ".toTable(catalog.volume.table_name)", I have an autoloder script that reads all the files from a source volume in unity catalog, inside the source I have two different files with two different schemas.
I want to send them to two tables: catalog.volume.table_name1 (for files of type 1) and catalog.volume.table_name2 (for files of type2), how can I specify this in the readStream options or in the writeStream options part?"

 
2 REPLIES 2

shan_chandra
Esteemed Contributor

Hi @ShlomoSQM -  Please refer to the below steps and let us know if it works

1) Add a column(with column) for filename during readStream data from autoloader using input_file_name() function

2) split the dataframe (df to df1, df2)  into two based on type1 and type2 (using file name contains type1.. say for example) using schema 1 and schema 2.

3) using merge statement, insert/update the data to the table1 and table2 with the respective dataframes for type1(df1), df2. 

Palash01
Valued Contributor

Hey @ShlomoSQM, looks like @shan_chandra suggested a feasible solution, just to add a little more context this is how you can achieve the same if you have a column that can help you identify what is type1 and type 2

file_type1_stream = readStream.option("cloudFiles.format", "parquet") \
                              .load("/mnt/source_volume") \
                              .filter(col("file_type") == "type1")  # Assuming a column indicating file type

file_type2_stream = readStream.option("cloudFiles.format", "parquet") \
                              .load("/mnt/source_volume") \
                              .filter(col("file_type") == "type2")

file_type1_stream.writeStream.format("delta") \
                            .option("checkpointLocation", "/mnt/checkpoints/type1") \
                            .toTable("catalog.volume.table_name1")

file_type2_stream.writeStream.format("delta") \
                            .option("checkpointLocation", "/mnt/checkpoints/type2") \
                            .toTable("catalog.volume.table_name2")

 

Leave a like if this helps! Kudos,
Palash

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group