Here is the solution I ended up with. Any suggestions for improvements are welcome!
I created a helper-function:
import logging
import uuid
from typing import Dict
logging.basicConfig()
LOGGER = logging.getLogger(__name__)
from pyspark.sql import SparkSession
def create_liquid_clustered_delta_table(location: str, clustercols: Dict[str, str]):
"""Uses spark SQL to create a new liquid clustered delta table at location.
There seems to be no way to create a delta table with liquid clustering using the normal
pyspark write api, so we have to create it as an external table in sql, which we immediately drop.
This leaves the delta table at location, with the schema given by "clustercols", which needs to be
a dict from colum n names to their types.
Hint: Clustercols does not need to contain the full schema if you use schema evolution later when
writing to the delta table.
"""
spark = SparkSession.builder.getOrCreate()
temp_tbl_name = "tmp" + str(uuid.uuid1()).replace("-", "")
type_spec = ", ".join([k + " " + v for k, v in clustercols.items()])
columnames = ", ".join(clustercols.keys())
create_str = f"CREATE EXTERNAL TABLE {temp_tbl_name}({type_spec}) CLUSTER BY ({columnames}) LOCATION '{location}'"
LOGGER.debug(
"Creating table with tmp name %s located at %s by executing '%s'",
temp_tbl_name,
location,
create_str,
)
spark.sql(create_str)
spark.sql(f"ALTER TABLE {temp_tbl_name} CLUSTER BY ({columnames})")
drop_str = f"drop table {temp_tbl_name}"
spark.sql(drop_str)
LOGGER.debug(
"Dropped tmp table %s, leaving delta table at external location %s",
temp_tbl_name,
location,
)
return create_str
And then in my pyspark streams I handle it like this:
if DeltaTable.isDeltaTable(spark, gold_path):
write_stream.start(gold_path)
else:
create_liquid_clustered_delta_table(
gold_path, {"Id": "string", "SourceTimestamp": "timestamp"}
)
write_stream.option("mergeSchema", "true").start(gold_path)
The "mergeSchema" is important for that first run, since the liquid clustered delta table wont have the full schema, only the columns given in the function call.