cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks read CDF by partitions for better performance?

mjedy78
New Contributor II

I’m working with a large dataframe in Databricks, processing it in a streaming-batch fashion (I’m reading as a stream, but using .trigger(availableNow=True) for batch-like processing).

I’m fetching around 40GB of CDF updates daily and performing some heavy aggregations in foreachBatch mode.

spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("maxBytesPerTrigger", "10G")


def process_and_upsert(df, batch_id):
    # heavy aggs and upsert:
    # pass

df_streaming_components.writeStream.format(DeltaUtils.FORMAT_DELTA)
    .option("checkpointLocation", checkpoint_loc)
    .foreachBatch(process_and_upsert)
    .outputMode("update")
    .trigger(availableNow=True)
    .start()
    .awaitTermination()

The tables are partitioned based on some key, e.g. class_number. Could I use that to read the CDF changes in partitions? For instance, I would like to process and stream changes by getting class_number="class_a"changes process and upsert. Next changes_df for "class_b" process and upsert ... Since my aggregations are heavy, it would be far quicker if I can get changes based on my partitioned column.

1 REPLY 1

cherry54wilder
New Contributor II

You can indeed leverage your partitioned column to read and process Change Data Feed (CDF) changes in partitions. This approach can help you manage the processing load and improve performance. Here's a general outline of how you can achieve this:

1. **Read the CDF changes with filtering by partition:**
You can read the CDF changes for a specific partition using a filter on the partition column.

2. **Process each partition separately:**
Iterate through the list of partition values, and for each partition, read the CDF changes and process them in the `foreachBatch` mode.

Here's an example of how you can implement this:

```python
from delta.tables import DeltaTable
import pyspark.sql.functions as F

# List of partition values (e.g., class numbers)
partition_values = ["class_a", "class_b", "class_c"] # Add all your partition values here

def process_partition(partition_value):
# Read the CDF changes for the specific partition
df_streaming_components = spark.readStream.format("delta") \
.option("readChangeFeed", "true") \
.option("maxBytesPerTrigger", "10G") \
.load() \
.filter(F.col("class_number") == partition_value) # Filter by partition value

df_streaming_components.writeStream.format("delta") \
.option("checkpointLocation", f"{checkpoint_loc}_{partition_value}") \
.foreachBatch(process_and_upsert) \
.outputMode("update") \
.trigger(availableNow=True) \
.start() \
.awaitTermination()

# Process each partition separately
for partition_value in partition_values:
process_partition(partition_value)
```

In this example, we're reading the CDF changes for each partition value, filtering the data by the partition column (`class_number`), and then processing and upserting the changes separately for each partition. This allows you to handle the heavy aggregations more efficiently by distributing the load across partitions.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group