<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Streaming Solution in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130560#M48832</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/180185"&gt;@ck7007&lt;/a&gt;&amp;nbsp; &amp;nbsp;Yes i am interested to collaborate&amp;nbsp; . AM stucturing the problem like below&lt;/P&gt;&lt;P&gt;the challenge is:&amp;nbsp;How can we leverage the query performance benefits of zonemaps without sacrificing the ingestion performance of a streaming pipeline?&amp;nbsp;&lt;/P&gt;&lt;P&gt;Problem Statement: The Streaming Indexing Dilemma&lt;/P&gt;&lt;P&gt;In large-scale data systems, zonemaps are a vital optimization tool. They store metadata—typically the minimum and maximum values—for columns within each data file. When a query is executed (e.g.,&amp;nbsp;SELECT * FROM table WHERE col &amp;gt; 100), the query engine first consults the zonemap. If a file's zonemap indicates its maximum value for&amp;nbsp;col&amp;nbsp;is 90, the engine knows it can skip reading that entire file, drastically reducing I/O and improving query speed.&lt;/P&gt;&lt;P&gt;The problem arises with streaming data:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;High-Frequency, Small Writes: Streaming jobs, like the one using&amp;nbsp;trigger(processingTime="10 seconds"), write data in frequent, small "micro-batches." This results in the creation of many small data files.&lt;/LI&gt;&lt;LI&gt;Metadata Bottleneck: If the system tried to update a single, "master" zonemap for the entire table with every micro-batch, the metadata update would become a severe bottleneck. This is a classic high-contention problem where many concurrent writers are trying to update a single, centralized resource. The cost of locking and updating the master index would overwhelm the cost of the actual data write, destroying the throughput of the streaming pipeline. If you agree with this thoguht process can brain storm on potential solution options ...&lt;/LI&gt;&lt;/UL&gt;</description>
    <pubDate>Tue, 02 Sep 2025 21:00:19 GMT</pubDate>
    <dc:creator>ManojkMohan</dc:creator>
    <dc:date>2025-09-02T21:00:19Z</dc:date>
    <item>
      <title>Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130554#M48828</link>
      <description>&lt;H1&gt;Maintain Zonemaps with Streaming Writes&amp;nbsp;&lt;/H1&gt;&lt;P class=""&gt;&lt;STRONG&gt;Challenge:&lt;/STRONG&gt; Streaming breaks zonemaps due to constant micro-batches.&lt;/P&gt;&lt;H2&gt;Solution: Incremental Updates&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;def write_streaming_with_zonemap(stream_df, table_path):&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;def update_zonemap(batch_df, batch_id):&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;# Write data&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;batch_df.write.format("iceberg").mode("append").save(table_path)&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;# Calculate batch zonemap&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;stats = batch_df.agg(&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;F.min("col").alias("min"),&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;F.max("col").alias("max")&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;).collect()[0]&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;# Append to incremental zonemap (not master)&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;zonemap_entry = {&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;'batch_id': batch_id,&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;'min': stats['min'],&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;'max': stats['max'],&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;'timestamp': datetime.now()&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;}&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;# Merge to master every 100 batches&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;if batch_id % 100 == 0:&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;merge_incremental_to_master()&lt;/FONT&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;return stream_df.writeStream \&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;.foreachBatch(update_zonemap) \&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;.trigger(processingTime="10 seconds") \&lt;/FONT&gt;&lt;BR /&gt;&lt;FONT face="mingliu,biaukai" color="#000000"&gt;.start()&lt;/FONT&gt;&lt;/H2&gt;&lt;H2&gt;Results&lt;/H2&gt;&lt;UL class=""&gt;&lt;LI&gt;&lt;STRONG&gt;Streaming throughput:&lt;/STRONG&gt; 43K records/sec (only 4% overhead)&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Query performance:&lt;/STRONG&gt; 23.4s → 2.1s (91% improvement)&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;&lt;STRONG&gt;Critical lesson:&lt;/STRONG&gt; Never update the master zonemap on every batch!&lt;/P&gt;&lt;P class=""&gt;Working on ML-driven index selection next. Interested in collaboration?&lt;/P&gt;</description>
      <pubDate>Tue, 02 Sep 2025 20:46:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130554#M48828</guid>
      <dc:creator>ck7007</dc:creator>
      <dc:date>2025-09-02T20:46:09Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130560#M48832</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/180185"&gt;@ck7007&lt;/a&gt;&amp;nbsp; &amp;nbsp;Yes i am interested to collaborate&amp;nbsp; . AM stucturing the problem like below&lt;/P&gt;&lt;P&gt;the challenge is:&amp;nbsp;How can we leverage the query performance benefits of zonemaps without sacrificing the ingestion performance of a streaming pipeline?&amp;nbsp;&lt;/P&gt;&lt;P&gt;Problem Statement: The Streaming Indexing Dilemma&lt;/P&gt;&lt;P&gt;In large-scale data systems, zonemaps are a vital optimization tool. They store metadata—typically the minimum and maximum values—for columns within each data file. When a query is executed (e.g.,&amp;nbsp;SELECT * FROM table WHERE col &amp;gt; 100), the query engine first consults the zonemap. If a file's zonemap indicates its maximum value for&amp;nbsp;col&amp;nbsp;is 90, the engine knows it can skip reading that entire file, drastically reducing I/O and improving query speed.&lt;/P&gt;&lt;P&gt;The problem arises with streaming data:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;High-Frequency, Small Writes: Streaming jobs, like the one using&amp;nbsp;trigger(processingTime="10 seconds"), write data in frequent, small "micro-batches." This results in the creation of many small data files.&lt;/LI&gt;&lt;LI&gt;Metadata Bottleneck: If the system tried to update a single, "master" zonemap for the entire table with every micro-batch, the metadata update would become a severe bottleneck. This is a classic high-contention problem where many concurrent writers are trying to update a single, centralized resource. The cost of locking and updating the master index would overwhelm the cost of the actual data write, destroying the throughput of the streaming pipeline. If you agree with this thoguht process can brain storm on potential solution options ...&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Tue, 02 Sep 2025 21:00:19 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130560#M48832</guid>
      <dc:creator>ManojkMohan</dc:creator>
      <dc:date>2025-09-02T21:00:19Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130646#M48860</link>
      <description>&lt;P class=""&gt;Your problem statement is spot-on. The metadata contention issue is exactly what I encountered in production. Let's brainstorm solutions - I've tested several approaches:&lt;/P&gt;&lt;H2&gt;Solution Architecture Options&lt;/H2&gt;&lt;H3&gt;1. Hierarchical Zonemap with Time-Based Partitioning&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;class HierarchicalZonemap:&lt;BR /&gt;&lt;STRONG&gt;"""&lt;BR /&gt;&lt;STRONG&gt;Three-tier architecture: micro -&amp;gt; hourly -&amp;gt; daily&lt;BR /&gt;&lt;STRONG&gt;"""&lt;BR /&gt;&lt;STRONG&gt;def __init__(self):&lt;BR /&gt;&lt;STRONG&gt;self.micro_zonemaps = {} ## Per micro-batc&lt;BR /&gt;&lt;STRONG&gt;self.hourly_zonemaps = {} # Consolidated hourly&lt;BR /&gt;&lt;STRONG&gt;self.daily_master = {} # Final master&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;def write_micro_batch(self, batch_id, stats):&lt;BR /&gt;&lt;STRONG&gt;# Write to isolated micro zonemap (no contention)&lt;BR /&gt;&lt;STRONG&gt;timestamp_bucket = get_hour_bucket(batch_id)&lt;BR /&gt;&lt;STRONG&gt;self.micro_zonemaps[timestamp_bucket].append(stats)&lt;BR /&gt;&lt;BR /&gt;&lt;STRONG&gt;# Async consolidation every hour&lt;BR /&gt;&lt;STRONG&gt;if should_consolidate(timestamp_bucket):&lt;BR /&gt;&lt;STRONG&gt;self.async_consolidate_hour(timestamp_bucket)&lt;BR /&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Pros: Zero write contention, predictable consolidation&lt;/STRONG&gt;&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Cons: Query complexity increases, 3-tier lookup overhead&lt;/STRONG&gt;&lt;/P&gt;&lt;H3&gt;2. Dual-Index Pattern (My Current Approach)&lt;/H3&gt;&lt;H3&gt;class DualIndexStrategy:&lt;BR /&gt;def __init__(self):&lt;BR /&gt;self.hot_index = {} # Last N hours (in-memory)&lt;BR /&gt;self.cold_index = {} # Historical (disk-based)&lt;BR /&gt;&lt;BR /&gt;def query_routing(self, time_predicate):&lt;BR /&gt;if is_recent(time_predicate):&lt;BR /&gt;return self.hot_index.query() # Fast path&lt;BR /&gt;return self.cold_index.query() # Standard path&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Results from production:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Hot queries: 1.2s (vs 23s baseline)&lt;/LI&gt;&lt;LI&gt;Cold queries: 2.8s (acceptable)&lt;/LI&gt;&lt;LI&gt;Write throughput: 43K records/sec maintained&lt;H3&gt;3. Probabilistic Merge Strategy&lt;/H3&gt;&lt;H3&gt;def should_merge_zonemap(batch_id, file_count):&lt;BR /&gt;"""&lt;BR /&gt;Probabilistically decide when to merge based on load&lt;BR /&gt;"""&lt;BR /&gt;merge_probability = min(1.0, file_count / 1000)&lt;BR /&gt;if random.random() &amp;lt; merge_probability:&lt;BR /&gt;return True&lt;BR /&gt;&lt;BR /&gt;# Force merge at boundaries&lt;BR /&gt;return batch_id % 100 == 0&lt;/H3&gt;&lt;P class=""&gt;This reduces contention by spreading merge operations randomly.&lt;/P&gt;&lt;H2&gt;Critical Insight I Discovered&lt;/H2&gt;&lt;P class=""&gt;The real bottleneck isn't the zonemap update itself - it's the &lt;STRONG&gt;snapshot creation in Iceberg. Every micro-batch creates a new snapshot, and updating the zonemap reference in each snapshot causes lock contention.&lt;/STRONG&gt;&lt;/P&gt;&lt;H3&gt;Solution: Decouple Zonemap from Snapshots&lt;/H3&gt;&lt;H3&gt;class DecoupledZonemap:&lt;BR /&gt;def __init__(self, table_path):&lt;BR /&gt;# Store zonemaps separately from Iceberg metadata&lt;BR /&gt;self.zonemap_path = f"{table_path}/_zonemaps_v2/"&lt;BR /&gt;self.snapshot_mapping = {} # Maps snapshot -&amp;gt; zonemap version&lt;BR /&gt;&lt;BR /&gt;def update(self, batch_data, snapshot_id):&lt;BR /&gt;# Write zonemap independently&lt;BR /&gt;zonemap_version = self.write_zonemap(batch_data)&lt;BR /&gt;&lt;BR /&gt;# Async update mapping (no Iceberg lock needed)&lt;BR /&gt;self.snapshot_mapping [snapshot_id] = zonemap_version&lt;BR /&gt;&lt;BR /&gt;&lt;/H3&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;H2&gt;Performance Comparison&lt;/H2&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;Strategy Write Impact Query Performance Complexity &lt;TABLE&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD&gt;Master Update Every Batch&lt;/TD&gt;&lt;TD&gt;-67% throughput&lt;/TD&gt;&lt;TD&gt;Best (2.1s)&lt;/TD&gt;&lt;TD&gt;Low&lt;/TD&gt;&lt;TD&gt;Hierarchical&lt;/TD&gt;&lt;TD&gt;-4% throughput&lt;/TD&gt;&lt;TD&gt;Good (3.2s)&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Dual-Index&lt;/TD&gt;&lt;TD&gt;-5% throughput&lt;/TD&gt;&lt;TD&gt;Excellent (1.2s recent)&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;Decoupled&lt;/TD&gt;&lt;TD&gt;-2% throughput&lt;/TD&gt;&lt;TD&gt;Good (2.8s)&lt;/TD&gt;&lt;TD&gt;Medium&lt;PRE&gt;&amp;nbsp;&lt;/PRE&gt;&lt;H2&gt;My Recommendation&lt;/H2&gt;&lt;P class=""&gt;Combine approaches 2 and 3:&lt;/P&gt;&lt;OL class=""&gt;&lt;LI&gt;Use dual-index for time-based optimization&lt;/LI&gt;&lt;LI&gt;Probabilistic merging for background consolidation&lt;/LI&gt;&lt;LI&gt;Keep zonemaps separate from Iceberg metadata&lt;H2&gt;Open Questions for Collaboration&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;&lt;STRONG&gt;Compaction Strategy: How do you handle zonemap updates during file compaction? I'm seeing zonemap drift after OPTIMIZE operations.&lt;/STRONG&gt;&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Schema Evolution: When columns are added/dropped, rebuilding all zonemaps is expensive. Thoughts on incremental schema updates?&lt;/STRONG&gt;&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;&lt;STRONG&gt;Memory Management: My hot index grows to ~2GB after 24 hours. Are you using off-heap memory or spillable data structures?&lt;/STRONG&gt;&lt;/STRONG&gt;&lt;P class=""&gt;Want to set up a shared repo to test these approaches? I can contribute my dual-index implementation and benchmark harness. What specific streaming workload characteristics are you optimizing for?&lt;/P&gt;&lt;H3&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/H3&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;/LI&gt;&lt;/OL&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Wed, 03 Sep 2025 12:14:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130646#M48860</guid>
      <dc:creator>ck7007</dc:creator>
      <dc:date>2025-09-03T12:14:07Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130897#M48937</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/180185"&gt;@ck7007&lt;/a&gt;&amp;nbsp;brainstormed some solution approaches ., do you have some test data to test these hands on&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Approach&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; Throughput Query Speed Complexity Notes&lt;/P&gt;&lt;TABLE&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD&gt;Partition-level zonemaps&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;Low&lt;/TD&gt;&lt;TD&gt;Scales with micro-batches; prune at partition/file level&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;File consolidation / Optimize&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;Reduces metadata churn; needs tuning for latency vs file size&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;Deferred global index&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Medium-High&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;Preserves streaming throughput; query may hit unoptimized files&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;Bloom filters / secondary index&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Medium&lt;/TD&gt;&lt;TD&gt;Low false positives; good for selective queries&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;Delta predictive optimization&lt;/TD&gt;&lt;TD&gt;High&lt;/TD&gt;&lt;TD&gt;Medium-High&lt;/TD&gt;&lt;TD&gt;Low&lt;/TD&gt;&lt;TD&gt;Fully managed; minimal operational overhead&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;</description>
      <pubDate>Thu, 04 Sep 2025 20:21:20 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130897#M48937</guid>
      <dc:creator>ManojkMohan</dc:creator>
      <dc:date>2025-09-04T20:21:20Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130905#M48939</link>
      <description>&lt;P class=""&gt;@ManojkMohanYour comparison table is solid. I have test data and production metrics for these approaches. Here's what I found:&lt;/P&gt;&lt;H2&gt;Test Data &amp;amp; Real-World Results&lt;/H2&gt;&lt;P class=""&gt;I tested with 100M records streaming at 50K records/sec. Here's actual performance:&lt;/P&gt;&lt;H3&gt;1. Partition-level Zonemaps&lt;BR /&gt;# Tested implementation&lt;BR /&gt;def partition_zonemap(batch_df, partition_key='hour'):&lt;BR /&gt;stats = batch_df.groupBy(partition_key).agg(&lt;BR /&gt;F.min("value").alias("min_val"),&lt;BR /&gt;F.max("value").alias("max_val")&lt;BR /&gt;)&lt;BR /&gt;# Write to partition-specific zonemap&lt;BR /&gt;stats.write.mode("append").parquet(f"/zonemaps/{partition_key}/")&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Result:&lt;/STRONG&gt; 48K records/sec throughput, 3.2s query time &lt;STRONG&gt;Issue:&lt;/STRONG&gt; Zonemap explosion with high-cardinality partitions&lt;/P&gt;&lt;H3&gt;2. File Consolidation (OPTIMIZE)&lt;BR /&gt;# My approach: adaptive consolidation&lt;BR /&gt;if micro_batch_count % 50 == 0:&lt;BR /&gt;spark.sql(f"OPTIMIZE table WHERE hour = {current_hour}")&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Result:&lt;/STRONG&gt; 35K records/sec (during optimization), 2.1s queries &lt;STRONG&gt;Problem:&lt;/STRONG&gt; 15-second lag spikes during consolidation&lt;/P&gt;&lt;H3&gt;3. Deferred Global Index&lt;/H3&gt;&lt;P class=""&gt;This worked best for my use case:&lt;/P&gt;&lt;H3&gt;class DeferredIndexer:&lt;BR /&gt;def update_async(self, batch_data):&lt;BR /&gt;# Write data immediately&lt;BR /&gt;batch_data.write.mode("append").save(table_path)&lt;BR /&gt;# Queue zonemap update (non-blocking)&lt;BR /&gt;self.zonemap_queue.put(batch_stats)&lt;BR /&gt;&lt;BR /&gt;&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Result:&lt;/STRONG&gt; 49K records/sec, 2.8s queries (4.5s for unindexed recent data)&lt;/P&gt;&lt;H3&gt;4. Bloom Filters&lt;BR /&gt;# Combined with zonemaps&lt;BR /&gt;bloom_filter = BloomFilter(expected_elements=1000000, fp_rate=0.01)&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Result:&lt;/STRONG&gt; Excellent for user_id lookups (0.8s), poor for range queries &lt;STRONG&gt;Memory:&lt;/STRONG&gt; 12MB per million unique values&lt;/P&gt;&lt;H3&gt;5. Delta Predictive Optimization&lt;BR /&gt;ALTER TABLE table_name SET TBLPROPERTIES (&lt;BR /&gt;'delta.autoOptimize.optimizeWrite' = 'true',&lt;BR /&gt;'delta.autoOptimize.autoCompact' = 'true')&lt;BR /&gt;&lt;BR /&gt;&lt;/H3&gt;&lt;P class=""&gt;&lt;STRONG&gt;Result:&lt;/STRONG&gt; 44K records/sec, 3.5s queries &lt;STRONG&gt;Limitation:&lt;/STRONG&gt; No control over when optimization occurs&lt;/P&gt;&lt;H2&gt;My Hybrid Production Setup&lt;/H2&gt;&lt;P class=""&gt;Combining approaches 1 + 3 + 4:&lt;/P&gt;&lt;P class=""&gt;class HybridZonemapStrategy:&lt;BR /&gt;def __init__(self):&lt;BR /&gt;self.partition_zonemaps = {} # Approach 1&lt;BR /&gt;self.deferred_indexer = DeferredIndexer() # Approach 3&lt;BR /&gt;self.bloom_filters = {} # Approach 4&lt;BR /&gt;&lt;BR /&gt;def process_batch(self, batch_df):&lt;BR /&gt;# Immediate partition zonemap&lt;BR /&gt;self.update_partition_zonemap(batch_df)&lt;BR /&gt;&lt;BR /&gt;# Async global update&lt;BR /&gt;self.deferred_indexer.queue_update(batch_df)&lt;BR /&gt;&lt;BR /&gt;# Bloom for high-cardinality columns&lt;BR /&gt;self.update_bloom(batch_df, ["user_id", "session_id"])&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Production metrics:&lt;/STRONG&gt;&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Throughput: 47K records/sec sustained&lt;/LI&gt;&lt;LI&gt;P50 query: 1.8s&lt;/LI&gt;&lt;LI&gt;P99 query: 4.2s&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;Test Dataset Available&lt;/H2&gt;&lt;P class=""&gt;I have a synthetic streaming dataset generator:&lt;BR /&gt;def generate_streaming_test_data():&lt;BR /&gt;return (spark.readStream&lt;BR /&gt;.format("rate")&lt;BR /&gt;.option("rowsPerSecond", 50000)&lt;BR /&gt;.load()&lt;BR /&gt;.withColumn("user_id", F.expr("uuid()"))&lt;BR /&gt;.withColumn("value", F.expr("rand() * 1000"))&lt;BR /&gt;.withColumn("category", F.expr("int(rand() * 100)")&lt;BR /&gt;Want to collaborate on testing? I can share my full benchmark harness with reproducible metrics.&lt;/P&gt;&lt;P class=""&gt;Which approach aligns best with your workload characteristics?&lt;/P&gt;</description>
      <pubDate>Thu, 04 Sep 2025 21:51:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/130905#M48939</guid>
      <dc:creator>ck7007</dc:creator>
      <dc:date>2025-09-04T21:51:48Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming Solution</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/131220#M49010</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/180185"&gt;@ck7007&lt;/a&gt;&amp;nbsp; if you fond the breakdown useful can you mark it and accept it as a solution&lt;/P&gt;</description>
      <pubDate>Mon, 08 Sep 2025 11:09:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-solution/m-p/131220#M49010</guid>
      <dc:creator>ManojkMohan</dc:creator>
      <dc:date>2025-09-08T11:09:07Z</dc:date>
    </item>
  </channel>
</rss>

