cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Replay stream to migrate to liquid cluster

EndreM
New Contributor III

The documentation is sparse about how to migrate a partition table to a liquid cluster as the Alter table suggested in the documentation doesnt work when its a partitioned table.

The comments on this forum suggest replaying the stream. And this is what I try to do. We have data in our bronze table which contains json records. 1 record is transformed to an estimated 200 000 records. This should be easily handled by any compute, but even with the largest compute of 384 GB we get large amounts of garbage collection and eventually the driver restarts (or with G1GC it times out). So I have limited the transformation to only produce 100 records. This was successfull, but I see an excessive amount of commit and offset files. It produced 53 commit and offset files. This stream did work when the silver table was a partitioned table. But now I have enabled Unity Catalog and the stream fails. There is no easy way of disabling the unity catalog so I need to resolve why the stream from partitioned bronze table to liquid cluster in silver table fails. Any suggestions? I have tried maxing memory, splitting the transformations up into 5 steps and caching between each step. The transformation converts a large json file into multiple records. I tried changing to G1GC garbage collector, but only limiting the number of records being transformed worked, and this discards some records so is not a good solution.

1 REPLY 1

Louis_Frolio
Databricks Employee
Databricks Employee

Greetings @EndreM , I did some digging internally and I have come up with some helpful tips/tricks to help guide you through this issue:

Based on your situation, you're encountering several common challenges when migrating a partitioned table to liquid clustering with streaming replay. Let me provide a comprehensive response addressing the memory issues, excessive commit files, and Unity Catalog complications.

Understanding the Migration Limitations

The `ALTER TABLE CLUSTER BY` command doesn't work on partitioned tables because liquid clustering and partitioning are mutually exclusive features in Delta Lake. You must fully migrate from one approach to the other.

Root Causes of Your Memory Issues

Your memory problems stem from several factors:

Data explosion ratio: Transforming 1 bronze record into 200,000 silver records creates massive memory pressure during the explode/transformation operations, especially when the driver needs to track streaming state and metadata.

Streaming checkpoint overhead: The excessive commit and offset files (53 files you mentioned) indicate that your streaming job is creating too many small batches, likely due to memory constraints forcing frequent micro-batch completions. Each micro-batch generates commit and offset metadata, causing checkpoint bloat.

Unity Catalog credential overhead: The Unity Catalog integration adds authentication and permission checking overhead that wasn't present in your previous setup, compounding memory usage.

Recommended Solutions

Approach 1: Optimize the Streaming Transformation

Rather than limiting output records (which discards data), implement these optimizations:

Increase maxFilesPerTrigger and tune batch sizing:
```python
.option("maxFilesPerTrigger", 10) # Start small and increase gradually
.trigger(processingTime='5 minutes') # Give more time per batch
```

Enable stateful streaming optimizations:
```python
.option("spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.state.RDBStateProvider")
.option("spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled", "true")
```

Partition the explode operation by breaking your transformation into multiple steps with intermediate writes:
```python
# Step 1: Write intermediate bronze subset
bronze_subset = (spark.readStream
.format("delta")
.load(bronze_path)
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_intermediate)
.toTable("intermediate_table"))

# Step 2: Read and transform with explosion
silver_output = (spark.readStream
.table("intermediate_table")
.transform(your_explosion_logic)
.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_silver)
.option("optimizeWrite", "true")
.toTable("silver_liquid_clustered"))
```

Approach 2: Use CTAS with Batch Processing

For large-scale migration, streaming replay may not be optimal. Consider a batch migration approach:

```sql
-- Create new liquid clustered table from scratch
CREATE TABLE silver_new
CLUSTER BY (your_clustering_columns)
AS SELECT * FROM (
SELECT exploded_data
FROM bronze_table
LATERAL VIEW explode(json_field) AS exploded_data
);

-- Swap tables atomically
ALTER TABLE silver_old RENAME TO silver_backup;
ALTER TABLE silver_new RENAME TO silver;
```

Process in incremental batches if the full dataset is too large:
```python
# Process date ranges incrementally
for date_range in date_partitions:
df = spark.read.format("delta").load(bronze_path).filter(f"date = '{date_range}'")
transformed = transform_and_explode(df)
transformed.write.format("delta").mode("append").saveAsTable("silver_new")
```

Approach 3: Tune Cluster Configuration

Your driver memory issues require specific tuning:

Use high-memory driver nodes: Since you're already at 384GB workers, ensure your driver has comparable memory (at least 128GB for this workload).

Enable G1GC with proper sizing:
```
spark.driver.extraJavaOptions=-XX:+UseG1GC -XX:InitiatingHeapOccupancyPercent=35 -XX:ConcGCThreads=12
```

Reduce shuffle partitions to minimize task overhead:
```python
spark.conf.set("spark.sql.shuffle.partitions", "200") # Adjust based on data size
```

Addressing Excessive Commit Files

The 53 commit files from processing only 100 records indicates your checkpoint is fragmenting. To resolve:

Use a fresh checkpoint location when replaying streams to avoid corrupted state.

Enable checkpoint cleanup:
```python
spark.conf.set("spark.sql.streaming.fileSource.cleaner.enabled", "true")
```

Increase batch intervals to reduce micro-batch frequency.

Unity Catalog Considerations

The Unity Catalog migration introduces additional complexity:

Verify external location permissions: Ensure your storage credential has proper access to the underlying data location.

Use shared access mode clusters: Unity Catalog requires shared access mode for fine-grained permissions.

Check credential scopes: The `MissingCredentialScopeException` you mentioned suggests permission issues that must be resolved before streaming will work reliably.

Final Recommendations

Given your specific situation (1:200,000 explosion ratio), avoid streaming replay for the initial migration. Use batch processing with the CTAS approach, then enable streaming on the new liquid clustered table for incremental updates going forward. The streaming architecture wasn't designed for this scale of transformation during backfill operations, and liquid clustering's incremental optimization works best when applied to steady-state streaming workloads.

After migration, run `OPTIMIZE` regularly to maintain clustering quality:
```sql
OPTIMIZE silver_table;
```