cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Liquid clustering with structured streaming pyspark

Erik
Valued Contributor III

I would like to try out liquid clustering, but all the examples I see seem to be SQL tables created from selecting from other tables. Our gold tables are pyspark tables written directly to a table, e.g. like this:

 

silver_df.writeStream.partitionBy(["year", "month"]).format("delta").outputMode(
    "append"
).option("checkpointLocation", calculate_checkpoint_location(gold_path)).trigger(
    once=True
).start(
    gold_path
).awaitTermination()

Is there any way to enable liquid clustering for a delta table created like this?

 

3 REPLIES 3

Erik
Valued Contributor III

@Retired_modThanks for your reply. I know I can't use it with partitioning or zorder, thats ok. What I wonder about is if its possible to create a liquid clustering delta table using the pyspark writeStream API. For example like something like this:

silver_df.writeStream.clusterBy(["timestamp", "id"]).format("delta").outputMode(
    "append"
).option("checkpointLocation", calculate_checkpoint_location(gold_path)).trigger(
    once=True
).start(
    gold_path
).awaitTermination()

 

Erik
Valued Contributor III

Here is the solution I ended up with. Any suggestions for improvements are welcome!

I created a helper-function:

import logging
import uuid
from typing import Dict

logging.basicConfig()
LOGGER = logging.getLogger(__name__)
from pyspark.sql import SparkSession


def create_liquid_clustered_delta_table(location: str, clustercols: Dict[str, str]):
    """Uses spark SQL to create a new liquid clustered delta table at location.

    There seems to be no way to create a delta table with liquid clustering using the normal
    pyspark write api, so we have to create it as an external table in sql, which we immediately drop.
    This leaves the delta table at location, with the schema given by "clustercols", which needs to be
    a dict from colum n names to their types.

    Hint: Clustercols does not need to contain the full schema if you use schema evolution later when
    writing to the delta table.


    """
    spark = SparkSession.builder.getOrCreate()
    temp_tbl_name = "tmp" + str(uuid.uuid1()).replace("-", "")
    type_spec = ", ".join([k + " " + v for k, v in clustercols.items()])
    columnames = ", ".join(clustercols.keys())
    create_str = f"CREATE EXTERNAL TABLE {temp_tbl_name}({type_spec}) CLUSTER BY ({columnames}) LOCATION '{location}'"

    LOGGER.debug(
        "Creating table with tmp name %s located at %s by executing '%s'",
        temp_tbl_name,
        location,
        create_str,
    )
    spark.sql(create_str)
    spark.sql(f"ALTER TABLE {temp_tbl_name}  CLUSTER BY ({columnames})")

    drop_str = f"drop table {temp_tbl_name}"
    spark.sql(drop_str)
    LOGGER.debug(
        "Dropped tmp table %s, leaving delta table at external location %s",
        temp_tbl_name,
        location,
    )
    return create_str

  And then in my pyspark streams I handle it like this:

if DeltaTable.isDeltaTable(spark, gold_path):
    write_stream.start(gold_path)
else:
    create_liquid_clustered_delta_table(
        gold_path, {"Id": "string", "SourceTimestamp": "timestamp"}
    )
    write_stream.option("mergeSchema", "true").start(gold_path)

The "mergeSchema" is important for that first run, since the liquid clustered delta table wont have the full schema, only the columns given in the function call.

-werners-
Esteemed Contributor III

I did not find anything in the docs either.  I suppose a pyspark version will come in the future?

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now