<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Databricks read CDF by partitions for better performance? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/databricks-read-cdf-by-partitions-for-better-performance/m-p/110383#M43557</link>
    <description>&lt;P&gt;I’m working with a large dataframe in Databricks, processing it in a streaming-batch fashion (I’m reading as a stream, but using .trigger(availableNow=True) for batch-like processing).&lt;/P&gt;&lt;P&gt;I’m fetching around 40GB of CDF updates daily and performing some heavy aggregations in foreachBatch mode.&lt;/P&gt;&lt;PRE&gt;spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("maxBytesPerTrigger", "10G")


def process_and_upsert(df, batch_id):
    # heavy aggs and upsert:
    # pass

df_streaming_components.writeStream.format(DeltaUtils.FORMAT_DELTA)
    .option("checkpointLocation", checkpoint_loc)
    .foreachBatch(process_and_upsert)
    .outputMode("update")
    .trigger(availableNow=True)
    .start()
    .awaitTermination()&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;The tables are partitioned based on some key, e.g. class_number. Could I use that to read the CDF changes in partitions? For instance, I would like to process and stream changes by getting class_number="class_a"changes process and upsert. Next changes_df for "class_b" process and upsert ... Since my aggregations are heavy, it would be far quicker if I can get changes based on my partitioned column.&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 17 Feb 2025 11:11:12 GMT</pubDate>
    <dc:creator>mjedy78</dc:creator>
    <dc:date>2025-02-17T11:11:12Z</dc:date>
    <item>
      <title>Databricks read CDF by partitions for better performance?</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-read-cdf-by-partitions-for-better-performance/m-p/110383#M43557</link>
      <description>&lt;P&gt;I’m working with a large dataframe in Databricks, processing it in a streaming-batch fashion (I’m reading as a stream, but using .trigger(availableNow=True) for batch-like processing).&lt;/P&gt;&lt;P&gt;I’m fetching around 40GB of CDF updates daily and performing some heavy aggregations in foreachBatch mode.&lt;/P&gt;&lt;PRE&gt;spark.readStream.format("delta")
    .option("readChangeFeed", "true")
    .option("maxBytesPerTrigger", "10G")


def process_and_upsert(df, batch_id):
    # heavy aggs and upsert:
    # pass

df_streaming_components.writeStream.format(DeltaUtils.FORMAT_DELTA)
    .option("checkpointLocation", checkpoint_loc)
    .foreachBatch(process_and_upsert)
    .outputMode("update")
    .trigger(availableNow=True)
    .start()
    .awaitTermination()&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;The tables are partitioned based on some key, e.g. class_number. Could I use that to read the CDF changes in partitions? For instance, I would like to process and stream changes by getting class_number="class_a"changes process and upsert. Next changes_df for "class_b" process and upsert ... Since my aggregations are heavy, it would be far quicker if I can get changes based on my partitioned column.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 17 Feb 2025 11:11:12 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-read-cdf-by-partitions-for-better-performance/m-p/110383#M43557</guid>
      <dc:creator>mjedy78</dc:creator>
      <dc:date>2025-02-17T11:11:12Z</dc:date>
    </item>
    <item>
      <title>Re: Databricks read CDF by partitions for better performance?</title>
      <link>https://community.databricks.com/t5/data-engineering/databricks-read-cdf-by-partitions-for-better-performance/m-p/110386#M43559</link>
      <description>&lt;P&gt;You can indeed leverage your partitioned column to read and process Change Data Feed (CDF) changes in partitions. This approach can help you manage the processing load and improve performance. Here's a general outline of how you can achieve this:&lt;/P&gt;&lt;P&gt;1. **Read the CDF changes with filtering by partition:**&lt;BR /&gt;You can read the CDF changes for a specific partition using a filter on the partition column.&lt;/P&gt;&lt;P&gt;2. **Process each partition separately:**&lt;BR /&gt;Iterate through the list of partition values, and for each partition, read the CDF changes and process them in the `foreachBatch` mode.&lt;/P&gt;&lt;P&gt;Here's an example of how you can implement this:&lt;/P&gt;&lt;P&gt;```python&lt;BR /&gt;from delta.tables import DeltaTable&lt;BR /&gt;import pyspark.sql.functions as F&lt;/P&gt;&lt;P&gt;# List of partition values (e.g., class numbers)&lt;BR /&gt;partition_values = ["class_a", "class_b", "class_c"] # Add all your partition values here&lt;/P&gt;&lt;P&gt;def process_partition(partition_value):&lt;BR /&gt;# Read the CDF changes for the specific partition&lt;BR /&gt;df_streaming_components = spark.readStream.format("delta") \&lt;BR /&gt;.option("readChangeFeed", "true") \&lt;BR /&gt;.option("maxBytesPerTrigger", "10G") \&lt;BR /&gt;.load() \&lt;BR /&gt;.filter(F.col("class_number") == partition_value) # Filter by partition value&lt;BR /&gt;&lt;BR /&gt;df_streaming_components.writeStream.format("delta") \&lt;BR /&gt;.option("checkpointLocation", f"{checkpoint_loc}_{partition_value}") \&lt;BR /&gt;.foreachBatch(process_and_upsert) \&lt;BR /&gt;.outputMode("update") \&lt;BR /&gt;.trigger(availableNow=True) \&lt;BR /&gt;.start() \&lt;BR /&gt;.awaitTermination()&lt;/P&gt;&lt;P&gt;# Process each partition separately&lt;BR /&gt;for partition_value in partition_values:&lt;BR /&gt;process_partition(partition_value)&lt;BR /&gt;```&lt;/P&gt;&lt;P&gt;In this example, we're reading the CDF changes for each partition value, filtering the data by the partition column (`class_number`), and then processing and upserting the changes separately for each partition. This allows you to handle the heavy aggregations more efficiently by distributing the load across partitions.&lt;/P&gt;</description>
      <pubDate>Mon, 17 Feb 2025 12:02:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/databricks-read-cdf-by-partitions-for-better-performance/m-p/110386#M43559</guid>
      <dc:creator>cherry54wilder</dc:creator>
      <dc:date>2025-02-17T12:02:02Z</dc:date>
    </item>
  </channel>
</rss>

