cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Is there a way to automate Table creation in Databricks SQL based on a ADLS storage location which contains multiple Parquet files?

vasanthvk
New Contributor III

We have ADLS container location which contains several (100+) different data subjects folders which contain Parquet files with partition column and we want to expose each of the data subject folder as a table in Databricks SQL. Is there any way to automate the creation these tables?

abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject1/

abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject2/

abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject3/

abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject4/

....

abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject100/

I would like to automatically create tables DataSubject1,DataSubject2,DataSubject3...DataSubject100 under a database (sqlanalytics_db) in Databricks SQL.

1 ACCEPTED SOLUTION

Accepted Solutions

dazfuller
Contributor III

So one way of doing this is to simple iterate over the directories at that level and create a table for each. Assuming that the schema is the same in all files you could simple do something like

subjects = dbutils.fs.ls("abfss://landing@account.dfs.core.windows.net/sc/raw")
 
for subject in subject:
    table_name = subject.name.strip("/")
    spark.sql(f"CREATE TABLE IF NOT EXISTS `{table_name}` USING PARQUET LOCATION '{subject.path}'")

View solution in original post

7 REPLIES 7

BilalAslamDbrx
Honored Contributor II
Honored Contributor II

@Vasanth Kumar​ do you have new data arriving in these locations or not? In the case where you do NOT have new data arriving, you can simply run a COPY INTO command, pointing to the location. Example:

CREATE TABLE DataSubject1;
 
COPY INTO DataSubject1
FROM 'abfss://landing@xyz.dfs.core.windows.net/sc/raw/DataSubject1'
FILEFORMAT = PARQUET
FORMAT_OPTIONS (
  'inferSchema' = ' true',
  'mergeSchema' = true'
);

Now that you can run this command for one storage path, you can now template it to run for many storage paths. Probably the easiest way to do this is to use Python variable substitution to generate the SQL as a string and run it against a cluster.

PS: Don't forget to set the OWNER of the newly-created tables otherwise you won't see them in Databricks SQL (admins will see all newly-created tables)

vasanthvk
New Contributor III

Thank you for the guidance, to answer your question yes new data will be coming to the location.

BilalAslamDbrx
Honored Contributor II
Honored Contributor II

@Vasanth Kumar​ a clarification to my original answer: you need to provide schema to the CREATE TABLE statement, it doesn't work (except in some edge cases) without a schema.

The fact that you have new data incoming changes things. If your data were arriving in a single directory, it would be super trivial to load it using the Databricks Auto Loader. You just point Auto Loader to an ABFS path and it continuously loads data (either via direct file listing, or through a cloud queue).

Let me dig into this scenario a bit more and come back to you ...

dazfuller
Contributor III

So one way of doing this is to simple iterate over the directories at that level and create a table for each. Assuming that the schema is the same in all files you could simple do something like

subjects = dbutils.fs.ls("abfss://landing@account.dfs.core.windows.net/sc/raw")
 
for subject in subject:
    table_name = subject.name.strip("/")
    spark.sql(f"CREATE TABLE IF NOT EXISTS `{table_name}` USING PARQUET LOCATION '{subject.path}'")

vasanthvk
New Contributor III

Thank you for the code snippet I was able to run this script with my file location and create multiple tables in one go!

User16857282152
Contributor

Updating dazfuller suggestion, but including code for one level of partitioning,

of course if you have deeper partitions then you will have to make a function and do a recursive call to get to the final directory containing parquet files.

Parquet will have the schema for everything but the partitions, you will have to add that,

I chose the datatype for partitions to be strings, not sure you could deduce more than that, and honestly not sure it matters in terms of spark optimizations.

Here is the code..

# create a working directory
 
data_path = "/tmp/directory_example"
# DELETE CONTENT 
# useful if run more than once or altered
dbutils.fs.rm(data_path, True)
dbutils.fs.mkdirs(data_path)

# create a database
database_name = "tomh_directory_test"
 
# CLEAR DATABASE IF ALREADY EXISTS
spark.sql(f"drop database IF EXISTS {database_name} cascade ")
spark.sql(f"create database {database_name}")
spark.sql(f"use {database_name}")

Create a dataframe to write sample content.

I partition in two different ways.

You might want to test if you have deeper nested partitions for example.

schema = "ID int, NAME String, DEPARTMENT String, YEAR string"
data = [[1,"Charles", "training","2020"],[2,"Evan", "sales", "2021"],[3,"Dorothy", "training", "2020"],[4,"Jason", "sales", "2021"]]
df = spark.createDataFrame(data, schema).coalesce(1)
display(df)

Write the data partitioned by department

table_name = "/table1"
partition_path = f"{data_path}{table_name}"
print(partition_path)
df.write.format("parquet").partitionBy("DEPARTMENT").save(partition_path)

Write the data partitioned by year

table_name = "/table2"
partition_path = f"{data_path}{table_name}"
print(partition_path)
df.write.format("parquet").partitionBy("YEAR").save(partition_path)

Current status

We have a directory with two subdirectories, each partitioned by a different column

Next steps

Write some code to generate statements similar to this

create table table1 (ID int, NAME string, DEPARTMENT STRING, YEAR STRING) USING PARQUET partitioned by (DEPARTMENT) location "/tmp/directory_example/table1";

It would be up to you if you wanted to add an "if not exists clause" to the create table statements. Only new directories would have tables created.

Table Names

I use the directory for the table name

Partitions

If the parquet files have subdirectories, those will be our partitions, I am only going one level deep here, you may need to recurse till you hit the data files if some tables have multiple partitions

Table Location

This will be the directory same as the one for the table name

Table Schema

Table Schema will be a combination of a schema generated by reading one of the partition folders and adding the partition column.

note if empty partitions exist you will have to catch that and read another partition

Create table

We will use external tables, by defining the location the tables are external.

MSCK

The code runs MSCK repair table to update the metastore to check for partitions.

basedir = dbutils.fs.ls(data_path)
#print(basedir)
#partition_list = []
for x in basedir:
  if x[1].endswith("/"): # has a subdirectory
    # Use directory name for table name
    # basedir is a list of row objects
    # item at index 1 x[1] is file name
    table_name = x[1][:-1] # remove last character /
    print(f"table name: {table_name}")
    
    # Get table location from same list of file objects
    # x[0] is full path
    
    table_location = x[0]
    print(f"table location: {table_location}")
    
    # To get the partition column get list of row objects 
    # from dbutils.fs on the subdirectory
    # Same logic as above, get directory name and parse for name of partition column
    path_to_one_subdirectory = dbutils.fs.ls(x[0])[0][0] # this will be a directory with parquet files you will need a recursive call if sub-partitions are used
    partition_column = (dbutils.fs.ls(x[0])[0][1].split("=")[0])
    print(f"partition column is: {partition_column}")
    
    # Get the base schema
    # Create a dataframe from the data directories, the ones that hold parquet files
    df = spark.read.format("parquet").load(path_to_one_subdirectory)
    base_schema = df.schema.simpleString()[7:-1].replace(":"," ")
    #print(myschema[7:-1].replace(":"," "))
    #print(base_schema)
  
    # Add the partition column
    full_schema = f"{base_schema} , {partition_column} string"
    #print("######")
    print(f"full schema is: {full_schema}")
    #print("#######")
    
    # Build the create table statements
    create_table_statement = f"create table {table_name} ({full_schema}) USING PARQUET partitioned by ({partition_column})  location '{table_location}'"
    print(f"Create table statement: {create_table_statement}")
    
    # create the table
    spark.sql(create_table_statement)
    spark.sql(f"msck repair table {table_name}")
    print("----")

At the end of that, you should have table1 and table2 in the current database.

%sql
 
SELECT * from table2;
%sql
 
SELECT * from table1;

To cleanup this code if you ran it all as an experiment do this.

#cleanup
 
dbutils.fs.rm(data_path, True)
#cleanup
 
spark.sql("drop database tomh_directory_test cascade ")

I have attached a notebook of this demo.

Thank you for the detailed explanation along with an example code, appreciate it!

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.