User16857282152
Databricks Employee
Databricks Employee

Updating dazfuller suggestion, but including code for one level of partitioning,

of course if you have deeper partitions then you will have to make a function and do a recursive call to get to the final directory containing parquet files.

Parquet will have the schema for everything but the partitions, you will have to add that,

I chose the datatype for partitions to be strings, not sure you could deduce more than that, and honestly not sure it matters in terms of spark optimizations.

Here is the code..

# create a working directory
 
data_path = "/tmp/directory_example"
# DELETE CONTENT 
# useful if run more than once or altered
dbutils.fs.rm(data_path, True)
dbutils.fs.mkdirs(data_path)

# create a database
database_name = "tomh_directory_test"
 
# CLEAR DATABASE IF ALREADY EXISTS
spark.sql(f"drop database IF EXISTS {database_name} cascade ")
spark.sql(f"create database {database_name}")
spark.sql(f"use {database_name}")

Create a dataframe to write sample content.

I partition in two different ways.

You might want to test if you have deeper nested partitions for example.

schema = "ID int, NAME String, DEPARTMENT String, YEAR string"
data = [[1,"Charles", "training","2020"],[2,"Evan", "sales", "2021"],[3,"Dorothy", "training", "2020"],[4,"Jason", "sales", "2021"]]
df = spark.createDataFrame(data, schema).coalesce(1)
display(df)

Write the data partitioned by department

table_name = "/table1"
partition_path = f"{data_path}{table_name}"
print(partition_path)
df.write.format("parquet").partitionBy("DEPARTMENT").save(partition_path)

Write the data partitioned by year

table_name = "/table2"
partition_path = f"{data_path}{table_name}"
print(partition_path)
df.write.format("parquet").partitionBy("YEAR").save(partition_path)

Current status

We have a directory with two subdirectories, each partitioned by a different column

Next steps

Write some code to generate statements similar to this

create table table1 (ID int, NAME string, DEPARTMENT STRING, YEAR STRING) USING PARQUET partitioned by (DEPARTMENT) location "/tmp/directory_example/table1";

It would be up to you if you wanted to add an "if not exists clause" to the create table statements. Only new directories would have tables created.

Table Names

I use the directory for the table name

Partitions

If the parquet files have subdirectories, those will be our partitions, I am only going one level deep here, you may need to recurse till you hit the data files if some tables have multiple partitions

Table Location

This will be the directory same as the one for the table name

Table Schema

Table Schema will be a combination of a schema generated by reading one of the partition folders and adding the partition column.

note if empty partitions exist you will have to catch that and read another partition

Create table

We will use external tables, by defining the location the tables are external.

MSCK

The code runs MSCK repair table to update the metastore to check for partitions.

basedir = dbutils.fs.ls(data_path)
#print(basedir)
#partition_list = []
for x in basedir:
  if x[1].endswith("/"): # has a subdirectory
    # Use directory name for table name
    # basedir is a list of row objects
    # item at index 1 x[1] is file name
    table_name = x[1][:-1] # remove last character /
    print(f"table name: {table_name}")
    
    # Get table location from same list of file objects
    # x[0] is full path
    
    table_location = x[0]
    print(f"table location: {table_location}")
    
    # To get the partition column get list of row objects 
    # from dbutils.fs on the subdirectory
    # Same logic as above, get directory name and parse for name of partition column
    path_to_one_subdirectory = dbutils.fs.ls(x[0])[0][0] # this will be a directory with parquet files you will need a recursive call if sub-partitions are used
    partition_column = (dbutils.fs.ls(x[0])[0][1].split("=")[0])
    print(f"partition column is: {partition_column}")
    
    # Get the base schema
    # Create a dataframe from the data directories, the ones that hold parquet files
    df = spark.read.format("parquet").load(path_to_one_subdirectory)
    base_schema = df.schema.simpleString()[7:-1].replace(":"," ")
    #print(myschema[7:-1].replace(":"," "))
    #print(base_schema)
  
    # Add the partition column
    full_schema = f"{base_schema} , {partition_column} string"
    #print("######")
    print(f"full schema is: {full_schema}")
    #print("#######")
    
    # Build the create table statements
    create_table_statement = f"create table {table_name} ({full_schema}) USING PARQUET partitioned by ({partition_column})  location '{table_location}'"
    print(f"Create table statement: {create_table_statement}")
    
    # create the table
    spark.sql(create_table_statement)
    spark.sql(f"msck repair table {table_name}")
    print("----")

At the end of that, you should have table1 and table2 in the current database.

%sql
 
SELECT * from table2;
%sql
 
SELECT * from table1;

To cleanup this code if you ran it all as an experiment do this.

#cleanup
 
dbutils.fs.rm(data_path, True)
#cleanup
 
spark.sql("drop database tomh_directory_test cascade ")

I have attached a notebook of this demo.