cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Liquid clustering with structured streaming pyspark

Erik
Valued Contributor II

I would like to try out liquid clustering, but all the examples I see seem to be SQL tables created from selecting from other tables. Our gold tables are pyspark tables written directly to a table, e.g. like this:

 

silver_df.writeStream.partitionBy(["year", "month"]).format("delta").outputMode(
    "append"
).option("checkpointLocation", calculate_checkpoint_location(gold_path)).trigger(
    once=True
).start(
    gold_path
).awaitTermination()

Is there any way to enable liquid clustering for a delta table created like this?

 

4 REPLIES 4

Kaniz_Fatma
Community Manager
Community Manager

Hi @Erik , it appears that liquid clustering needs to be enabled when first creating a table and it's not compatible with partitioning or ZORDER. In your case, you are writing your data to a Delta table using PySpark's writeStream API with partitioning.

Erik
Valued Contributor II

@Kaniz_FatmaThanks for your reply. I know I can't use it with partitioning or zorder, thats ok. What I wonder about is if its possible to create a liquid clustering delta table using the pyspark writeStream API. For example like something like this:

silver_df.writeStream.clusterBy(["timestamp", "id"]).format("delta").outputMode(
    "append"
).option("checkpointLocation", calculate_checkpoint_location(gold_path)).trigger(
    once=True
).start(
    gold_path
).awaitTermination()

 

Erik
Valued Contributor II

Here is the solution I ended up with. Any suggestions for improvements are welcome!

I created a helper-function:

import logging
import uuid
from typing import Dict

logging.basicConfig()
LOGGER = logging.getLogger(__name__)
from pyspark.sql import SparkSession


def create_liquid_clustered_delta_table(location: str, clustercols: Dict[str, str]):
    """Uses spark SQL to create a new liquid clustered delta table at location.

    There seems to be no way to create a delta table with liquid clustering using the normal
    pyspark write api, so we have to create it as an external table in sql, which we immediately drop.
    This leaves the delta table at location, with the schema given by "clustercols", which needs to be
    a dict from colum n names to their types.

    Hint: Clustercols does not need to contain the full schema if you use schema evolution later when
    writing to the delta table.


    """
    spark = SparkSession.builder.getOrCreate()
    temp_tbl_name = "tmp" + str(uuid.uuid1()).replace("-", "")
    type_spec = ", ".join([k + " " + v for k, v in clustercols.items()])
    columnames = ", ".join(clustercols.keys())
    create_str = f"CREATE EXTERNAL TABLE {temp_tbl_name}({type_spec}) CLUSTER BY ({columnames}) LOCATION '{location}'"

    LOGGER.debug(
        "Creating table with tmp name %s located at %s by executing '%s'",
        temp_tbl_name,
        location,
        create_str,
    )
    spark.sql(create_str)
    spark.sql(f"ALTER TABLE {temp_tbl_name}  CLUSTER BY ({columnames})")

    drop_str = f"drop table {temp_tbl_name}"
    spark.sql(drop_str)
    LOGGER.debug(
        "Dropped tmp table %s, leaving delta table at external location %s",
        temp_tbl_name,
        location,
    )
    return create_str

  And then in my pyspark streams I handle it like this:

if DeltaTable.isDeltaTable(spark, gold_path):
    write_stream.start(gold_path)
else:
    create_liquid_clustered_delta_table(
        gold_path, {"Id": "string", "SourceTimestamp": "timestamp"}
    )
    write_stream.option("mergeSchema", "true").start(gold_path)

The "mergeSchema" is important for that first run, since the liquid clustered delta table wont have the full schema, only the columns given in the function call.

-werners-
Esteemed Contributor III

I did not find anything in the docs either.  I suppose a pyspark version will come in the future?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group