<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Technical Deep Dive in Community Articles</title>
    <link>https://community.databricks.com/t5/community-articles/technical-deep-dive/m-p/130171#M655</link>
    <description>&lt;H1&gt;Bloom Filters + Zonemaps: The Ultimate Query Optimization Combo&lt;/H1&gt;&lt;P class=""&gt;After my zonemap post last week got great feedback, several of you asked about Bloom filter integration. Here's the complete implementation!&lt;/P&gt;&lt;H2&gt;Why Bloom Filters Changed Everything&lt;/H2&gt;&lt;P class=""&gt;Zonemaps are great for range queries, but what about&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Exact match lookups?&lt;/LI&gt;&lt;LI&gt;IN clauses with multiple values?&lt;/LI&gt;&lt;LI&gt;High-cardinality string columns?&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Enter Bloom filters! Combined with zone maps, we achieved &lt;STRONG&gt;93% query improvement&lt;/STRONG&gt;.&lt;BR /&gt;Query → Bloom Filter Check → Zonemap Pruning → File Scan&lt;BR /&gt;(95% elimination) (80% elimination) (Minimal I/O)&lt;/P&gt;&lt;H2&gt;The Architecture That Works&lt;/H2&gt;&lt;H2&gt;Complete Implementation&lt;/H2&gt;&lt;H3&gt;1. Bloom Filter Creation with Optimal Size&lt;/H3&gt;&lt;P&gt;from pyspark.sql import functions as F&lt;BR /&gt;from pybloom_live import BloomFilter&lt;BR /&gt;import mmh3&lt;/P&gt;&lt;P&gt;class AdaptiveBloomFilter:&lt;BR /&gt;def __init__(self, expected_elements, false_positive_rate=0.01):&lt;BR /&gt;self.filter = BloomFilter(&lt;BR /&gt;capacity=expected_elements,&lt;BR /&gt;error_rate=false_positive_rate)&lt;BR /&gt;self.actual_elements = 0&lt;BR /&gt;&lt;BR /&gt;def add_batch(self, spark_df, column):&lt;BR /&gt;"""&lt;BR /&gt;Adds column values to Bloom filter&lt;BR /&gt;"""&lt;BR /&gt;unique_values = spark_df. select(column). distinct(). collect()&lt;BR /&gt;&lt;BR /&gt;for row in unique_values:&lt;BR /&gt;if row[0] is not None:&lt;BR /&gt;self.filter.add(str(row[0]))&lt;BR /&gt;self.actual_elements += 1&lt;BR /&gt;&lt;BR /&gt;# Auto-resize if getting full&lt;BR /&gt;if self.actual_elements &amp;gt; self.filter.capacity * 0.8:&lt;BR /&gt;self._resize()&lt;BR /&gt;&lt;BR /&gt;def _resize(self):&lt;BR /&gt;"""&lt;BR /&gt;Creates new filter with 2x capacity&lt;BR /&gt;"""&lt;BR /&gt;new_capacity = self.filter.capacity * 2&lt;BR /&gt;# Implementation details...&lt;BR /&gt;&lt;BR /&gt;def might_contain(self, value):&lt;BR /&gt;return str(value) in self.filter&lt;BR /&gt;2. Integrated Index Structure&lt;BR /&gt;class HybridIndex:&lt;BR /&gt;"""&lt;BR /&gt;Combines Zonemap and Bloom Filter for maximum pruning&lt;BR /&gt;"""&lt;BR /&gt;def __init__(self, table_path):&lt;BR /&gt;self.table_path = table_path&lt;BR /&gt;self.zonemap = {}&lt;BR /&gt;self.bloom_filters = {}&lt;BR /&gt;self.stats = {}&lt;BR /&gt;&lt;BR /&gt;def build_comprehensive_index(self):&lt;BR /&gt;parquet_files = list_all_parquet_files(self.table_path)&lt;BR /&gt;&lt;BR /&gt;for file_path in parquet_files:&lt;BR /&gt;df = spark.read.parquet(file_path)&lt;BR /&gt;&lt;BR /&gt;# Build zonemap&lt;BR /&gt;self.zonemap[file_path] = {&lt;BR /&gt;col: {&lt;BR /&gt;'min': df.agg(F.min(col)). collect()[0][0],&lt;BR /&gt;'max': df.agg(F.max(col)). collect()[0][0],&lt;BR /&gt;'nulls': df.filter(F.col(col). isNull()). count(),&lt;BR /&gt;'count': df.count()}&lt;BR /&gt;for col in df.columns}&lt;BR /&gt;&lt;BR /&gt;# Build Bloom filter for high-cardinality columns&lt;BR /&gt;for col in self._identify_high_cardinality_cols(df):&lt;BR /&gt;if file_path is not in self. bloom_filters:&lt;BR /&gt;self.bloom_filters[file_path] = {}&lt;BR /&gt;&lt;BR /&gt;bloom = AdaptiveBloomFilter(&lt;BR /&gt;expected_elements=df.count(),&lt;BR /&gt;false_positive_rate=0.001&lt;BR /&gt;)&lt;BR /&gt;bloom.add_batch(df, col)&lt;BR /&gt;self.bloom_filters [file_path] [col] = bloom&lt;BR /&gt;&lt;BR /&gt;# Collect statistics&lt;BR /&gt;self.stats[file_path] = {&lt;BR /&gt;'row_count': df.count(),&lt;BR /&gt;'size_bytes': get_file_size(file_path),&lt;BR /&gt;'columns': df.columns,&lt;BR /&gt;'compression_ratio': calculate_compression_ratio(file_path)&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;def prune_files(self, predicate):&lt;BR /&gt;"""&lt;BR /&gt;Uses both indexes to eliminate files&lt;BR /&gt;"""&lt;BR /&gt;candidate_files = []&lt;BR /&gt;&lt;BR /&gt;for file_path in self.zonemap.keys():&lt;BR /&gt;# Check zonemap first (cheap)&lt;BR /&gt;if self._zonemap_matches(file_path, predicate):&lt;BR /&gt;# Then check Bloom filter (if applicable)&lt;BR /&gt;if self._bloom_filter_matches(file_path, predicate):&lt;BR /&gt;candidate_files.append(file_path)&lt;BR /&gt;&lt;BR /&gt;pruning_rate = 1 - (len(candidate_files) / len(self.zonemap))&lt;BR /&gt;print(f"&lt;span class="lia-unicode-emoji" title=":sparkles:"&gt;✨&lt;/span&gt; Pruned {pruning_rate:.1%} of files!")&lt;BR /&gt;&lt;BR /&gt;return candidate_files&lt;BR /&gt;class HybridIndex:&lt;BR /&gt;"""&lt;BR /&gt;Combines Zonemap and Bloom Filter for maximum pruning&lt;BR /&gt;"""&lt;BR /&gt;def __init__(self, table_path):&lt;BR /&gt;self.table_path = table_path&lt;BR /&gt;self.zonemap = {}&lt;BR /&gt;self.bloom_filters = {}&lt;BR /&gt;self.stats = {}&lt;BR /&gt;&lt;BR /&gt;def build_comprehensive_index(self):&lt;BR /&gt;parquet_files = list_all_parquet_files(self.table_path)&lt;BR /&gt;&lt;BR /&gt;for file_path in parquet_files:&lt;BR /&gt;df = spark.read.parquet(file_path)&lt;BR /&gt;&lt;BR /&gt;# Build zonemap&lt;BR /&gt;self.zonemap[file_path] = {&lt;BR /&gt;col: {&lt;BR /&gt;'min': df.agg(F.min(col)). collect()[0][0],&lt;BR /&gt;'max': df.agg(F.max(col)). collect()[0][0],&lt;BR /&gt;'nulls': df.filter(F.col(col). isNull()). count(),&lt;BR /&gt;'count': df.count()&lt;BR /&gt;}&lt;BR /&gt;for col in df.columns}&lt;BR /&gt;&lt;BR /&gt;# Build Bloom filter for high-cardinality columns&lt;BR /&gt;for col in self._identify_high_cardinality_cols(df):&lt;BR /&gt;if file_path is not in self. bloom_filters:&lt;BR /&gt;self.bloom_filters[file_path] = {}&lt;BR /&gt;&lt;BR /&gt;bloom = AdaptiveBloomFilter(&lt;BR /&gt;expected_elements=df.count(),&lt;BR /&gt;false_positive_rate=0.001&lt;BR /&gt;)&lt;BR /&gt;bloom.add_batch(df, col)&lt;BR /&gt;self.bloom_filters [file_path] [col] = bloom&lt;BR /&gt;&lt;BR /&gt;# Collect statistics&lt;BR /&gt;self.stats[file_path] = {&lt;BR /&gt;'row_count': df.count(),&lt;BR /&gt;'size_bytes': get_file_size(file_path),&lt;BR /&gt;'columns': df.columns,&lt;BR /&gt;'compression_ratio': calculate_compression_ratio(file_path)&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;def prune_files(self, predicate):&lt;BR /&gt;"""&lt;BR /&gt;Uses both indexes to eliminate files&lt;BR /&gt;"""&lt;BR /&gt;candidate_files = []&lt;BR /&gt;&lt;BR /&gt;for file_path in self.zonemap.keys():&lt;BR /&gt;# Check zonemap first (cheap)&lt;BR /&gt;if self._zonemap_matches(file_path, predicate):&lt;BR /&gt;# Then check Bloom filter (if applicable)&lt;BR /&gt;if self._bloom_filter_matches(file_path, predicate):&lt;BR /&gt;candidate_files.append(file_path)&lt;BR /&gt;&lt;BR /&gt;pruning_rate = 1 - (len(candidate_files) / len(self.zonemap))&lt;BR /&gt;print(f"&lt;span class="lia-unicode-emoji" title=":sparkles:"&gt;✨&lt;/span&gt; Pruned {pruning_rate:.1%} of files!")&lt;BR /&gt;&lt;BR /&gt;return candidate_files&lt;BR /&gt;3. Query Optimizer Integration&lt;/P&gt;&lt;P&gt;def optimize_spark_query(query_plan, hybrid_index):&lt;BR /&gt;"""&lt;BR /&gt;Rewrites Spark query plan to use hybrid index&lt;BR /&gt;"""&lt;BR /&gt;# Extract predicates from query&lt;BR /&gt;predicates = extract_predicates_from_plan(query_plan)&lt;BR /&gt;&lt;BR /&gt;# Get pruned file list&lt;BR /&gt;valid_files = hybrid_index.prune_files(predicates)&lt;BR /&gt;&lt;BR /&gt;# Inject file filter into query plan&lt;BR /&gt;optimized_plan = query_plan.transform(&lt;BR /&gt;lambda node: inject_file_filter(node, valid_files)&lt;BR /&gt;if isinstance(node, FileScan) else node&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;return optimized_plan&lt;/P&gt;&lt;P&gt;# Hook into Spark optimizer&lt;BR /&gt;spark.conf.set(&lt;BR /&gt;"spark.sql.adaptive.optimizer.extraOptimizers",&lt;BR /&gt;"com.mycompany.HybridIndexOptimizer")&lt;/P&gt;&lt;H2&gt;Benchmark Results That Blew My Mind&lt;/H2&gt;&lt;H3&gt;Test: 1B rows, 500GB Parquet files&lt;/H3&gt;&lt;P&gt;Query Type No Index Zonemap Only Zonemap + Bloom Improvement&lt;/P&gt;&lt;TABLE&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD&gt;Range Query&lt;/TD&gt;&lt;TD&gt;45.2s&lt;/TD&gt;&lt;TD&gt;8.3s&lt;/TD&gt;&lt;TD&gt;7.9s&lt;/TD&gt;&lt;TD&gt;82.5%&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;Exact Match&lt;/TD&gt;&lt;TD&gt;43.7s&lt;/TD&gt;&lt;TD&gt;41.2s&lt;/TD&gt;&lt;TD&gt;3.1s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;92.9%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;IN Clause (10 values)&lt;/TD&gt;&lt;TD&gt;44.5s&lt;/TD&gt;&lt;TD&gt;42.8s&lt;/TD&gt;&lt;TD&gt;4.7s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;89.4%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;JOIN on high-cardinality&lt;/TD&gt;&lt;TD&gt;126.3s&lt;/TD&gt;&lt;TD&gt;119.7s&lt;/TD&gt;&lt;TD&gt;14.2s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;88.8%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;H2&gt;Memory Overhead Analysis&lt;/H2&gt;&lt;P class=""&gt;Concerned about memory? Here's what I found:&lt;/P&gt;&lt;P&gt;def calculate_index_memory(table_stats):&lt;BR /&gt;"""&lt;BR /&gt;Estimates memory footprint&lt;BR /&gt;"""&lt;BR /&gt;zonemap_size = table_stats['num_files'] * \&lt;BR /&gt;table_stats['num_columns'] * \&lt;BR /&gt;32 # bytes per min/max entry&lt;BR /&gt;&lt;BR /&gt;bloom_size = table_stats['num_files'] * \&lt;BR /&gt;table_stats['high_card_columns'] * \&lt;BR /&gt;(table_stats['distinct_values'] * 1.44 / &lt;span class="lia-unicode-emoji" title=":smiling_face_with_sunglasses:"&gt;😎&lt;/span&gt; # bits to bytes&lt;BR /&gt;&lt;BR /&gt;total_mb = (zonemap_size + bloom_size) / 1048576&lt;BR /&gt;&lt;BR /&gt;return {&lt;BR /&gt;'zonemap_mb': zonemap_size / 1048576,&lt;BR /&gt;'bloom_mb': bloom_size / 1048576,&lt;BR /&gt;'total_mb': total_mb,&lt;BR /&gt;'percentage_of_data': (total_mb / table_stats['data_size_mb']) * 100&lt;BR /&gt;}&lt;/P&gt;&lt;P&gt;# Real example from production:&lt;BR /&gt;# 500GB table → 47MB index (0.009% overhead!)&lt;/P&gt;&lt;H2&gt;Gotchas and Solutions&lt;/H2&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 1:&lt;/STRONG&gt; Bloom filter size explosion with updates &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Incremental Bloom filter chains&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 2:&lt;/STRONG&gt; False positive accumulation &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Periodic rebuild based on query patterns&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 3:&lt;/STRONG&gt; Serialization overhead &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Binary format with Kryo serialization&lt;/P&gt;&lt;H2&gt;Production Deployment Tips&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;&lt;STRONG&gt;Start with read-heavy tables&lt;/STRONG&gt;—best ROI&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Monitor false positive rates&lt;/STRONG&gt;—adjust accordingly&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Use async index updates&lt;/STRONG&gt;—don't block writes&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Cache indexes in Redis&lt;/STRONG&gt;—shared across clusters&lt;/LI&gt;&lt;/OL&gt;&lt;H2&gt;Next Evolution: ML-Driven Index Selection&lt;/H2&gt;&lt;P class=""&gt;Working on using query logs to automatically determine:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Which columns need Bloom filters&lt;/LI&gt;&lt;LI&gt;Optimal false positive rates&lt;/LI&gt;&lt;LI&gt;When to rebuild indexes&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;Your Turn!&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;What's your experience with Bloom filters in Spark?&lt;/LI&gt;&lt;LI&gt;Has anyone tried HyperLogLog for cardinality estimation?&lt;/LI&gt;&lt;LI&gt;Interested in a Databricks notebook with full implementation?&lt;/LI&gt;&lt;/OL&gt;&lt;P class=""&gt;&lt;STRONG&gt;GitHub Repo:&lt;/STRONG&gt; [Coming this weekend with full code]&lt;/P&gt;&lt;P class=""&gt;Let's push the boundaries of what's possible! &lt;span class="lia-unicode-emoji" title=":rocket:"&gt;🚀&lt;/span&gt;&lt;/P&gt;&lt;P class=""&gt;#BloomFilter #AdvancedOptimization #Spark #DataEngineering #Performance&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 29 Aug 2025 14:28:00 GMT</pubDate>
    <dc:creator>ck7007</dc:creator>
    <dc:date>2025-08-29T14:28:00Z</dc:date>
    <item>
      <title>Technical Deep Dive</title>
      <link>https://community.databricks.com/t5/community-articles/technical-deep-dive/m-p/130171#M655</link>
      <description>&lt;H1&gt;Bloom Filters + Zonemaps: The Ultimate Query Optimization Combo&lt;/H1&gt;&lt;P class=""&gt;After my zonemap post last week got great feedback, several of you asked about Bloom filter integration. Here's the complete implementation!&lt;/P&gt;&lt;H2&gt;Why Bloom Filters Changed Everything&lt;/H2&gt;&lt;P class=""&gt;Zonemaps are great for range queries, but what about&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Exact match lookups?&lt;/LI&gt;&lt;LI&gt;IN clauses with multiple values?&lt;/LI&gt;&lt;LI&gt;High-cardinality string columns?&lt;/LI&gt;&lt;/UL&gt;&lt;P class=""&gt;Enter Bloom filters! Combined with zone maps, we achieved &lt;STRONG&gt;93% query improvement&lt;/STRONG&gt;.&lt;BR /&gt;Query → Bloom Filter Check → Zonemap Pruning → File Scan&lt;BR /&gt;(95% elimination) (80% elimination) (Minimal I/O)&lt;/P&gt;&lt;H2&gt;The Architecture That Works&lt;/H2&gt;&lt;H2&gt;Complete Implementation&lt;/H2&gt;&lt;H3&gt;1. Bloom Filter Creation with Optimal Size&lt;/H3&gt;&lt;P&gt;from pyspark.sql import functions as F&lt;BR /&gt;from pybloom_live import BloomFilter&lt;BR /&gt;import mmh3&lt;/P&gt;&lt;P&gt;class AdaptiveBloomFilter:&lt;BR /&gt;def __init__(self, expected_elements, false_positive_rate=0.01):&lt;BR /&gt;self.filter = BloomFilter(&lt;BR /&gt;capacity=expected_elements,&lt;BR /&gt;error_rate=false_positive_rate)&lt;BR /&gt;self.actual_elements = 0&lt;BR /&gt;&lt;BR /&gt;def add_batch(self, spark_df, column):&lt;BR /&gt;"""&lt;BR /&gt;Adds column values to Bloom filter&lt;BR /&gt;"""&lt;BR /&gt;unique_values = spark_df. select(column). distinct(). collect()&lt;BR /&gt;&lt;BR /&gt;for row in unique_values:&lt;BR /&gt;if row[0] is not None:&lt;BR /&gt;self.filter.add(str(row[0]))&lt;BR /&gt;self.actual_elements += 1&lt;BR /&gt;&lt;BR /&gt;# Auto-resize if getting full&lt;BR /&gt;if self.actual_elements &amp;gt; self.filter.capacity * 0.8:&lt;BR /&gt;self._resize()&lt;BR /&gt;&lt;BR /&gt;def _resize(self):&lt;BR /&gt;"""&lt;BR /&gt;Creates new filter with 2x capacity&lt;BR /&gt;"""&lt;BR /&gt;new_capacity = self.filter.capacity * 2&lt;BR /&gt;# Implementation details...&lt;BR /&gt;&lt;BR /&gt;def might_contain(self, value):&lt;BR /&gt;return str(value) in self.filter&lt;BR /&gt;2. Integrated Index Structure&lt;BR /&gt;class HybridIndex:&lt;BR /&gt;"""&lt;BR /&gt;Combines Zonemap and Bloom Filter for maximum pruning&lt;BR /&gt;"""&lt;BR /&gt;def __init__(self, table_path):&lt;BR /&gt;self.table_path = table_path&lt;BR /&gt;self.zonemap = {}&lt;BR /&gt;self.bloom_filters = {}&lt;BR /&gt;self.stats = {}&lt;BR /&gt;&lt;BR /&gt;def build_comprehensive_index(self):&lt;BR /&gt;parquet_files = list_all_parquet_files(self.table_path)&lt;BR /&gt;&lt;BR /&gt;for file_path in parquet_files:&lt;BR /&gt;df = spark.read.parquet(file_path)&lt;BR /&gt;&lt;BR /&gt;# Build zonemap&lt;BR /&gt;self.zonemap[file_path] = {&lt;BR /&gt;col: {&lt;BR /&gt;'min': df.agg(F.min(col)). collect()[0][0],&lt;BR /&gt;'max': df.agg(F.max(col)). collect()[0][0],&lt;BR /&gt;'nulls': df.filter(F.col(col). isNull()). count(),&lt;BR /&gt;'count': df.count()}&lt;BR /&gt;for col in df.columns}&lt;BR /&gt;&lt;BR /&gt;# Build Bloom filter for high-cardinality columns&lt;BR /&gt;for col in self._identify_high_cardinality_cols(df):&lt;BR /&gt;if file_path is not in self. bloom_filters:&lt;BR /&gt;self.bloom_filters[file_path] = {}&lt;BR /&gt;&lt;BR /&gt;bloom = AdaptiveBloomFilter(&lt;BR /&gt;expected_elements=df.count(),&lt;BR /&gt;false_positive_rate=0.001&lt;BR /&gt;)&lt;BR /&gt;bloom.add_batch(df, col)&lt;BR /&gt;self.bloom_filters [file_path] [col] = bloom&lt;BR /&gt;&lt;BR /&gt;# Collect statistics&lt;BR /&gt;self.stats[file_path] = {&lt;BR /&gt;'row_count': df.count(),&lt;BR /&gt;'size_bytes': get_file_size(file_path),&lt;BR /&gt;'columns': df.columns,&lt;BR /&gt;'compression_ratio': calculate_compression_ratio(file_path)&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;def prune_files(self, predicate):&lt;BR /&gt;"""&lt;BR /&gt;Uses both indexes to eliminate files&lt;BR /&gt;"""&lt;BR /&gt;candidate_files = []&lt;BR /&gt;&lt;BR /&gt;for file_path in self.zonemap.keys():&lt;BR /&gt;# Check zonemap first (cheap)&lt;BR /&gt;if self._zonemap_matches(file_path, predicate):&lt;BR /&gt;# Then check Bloom filter (if applicable)&lt;BR /&gt;if self._bloom_filter_matches(file_path, predicate):&lt;BR /&gt;candidate_files.append(file_path)&lt;BR /&gt;&lt;BR /&gt;pruning_rate = 1 - (len(candidate_files) / len(self.zonemap))&lt;BR /&gt;print(f"&lt;span class="lia-unicode-emoji" title=":sparkles:"&gt;✨&lt;/span&gt; Pruned {pruning_rate:.1%} of files!")&lt;BR /&gt;&lt;BR /&gt;return candidate_files&lt;BR /&gt;class HybridIndex:&lt;BR /&gt;"""&lt;BR /&gt;Combines Zonemap and Bloom Filter for maximum pruning&lt;BR /&gt;"""&lt;BR /&gt;def __init__(self, table_path):&lt;BR /&gt;self.table_path = table_path&lt;BR /&gt;self.zonemap = {}&lt;BR /&gt;self.bloom_filters = {}&lt;BR /&gt;self.stats = {}&lt;BR /&gt;&lt;BR /&gt;def build_comprehensive_index(self):&lt;BR /&gt;parquet_files = list_all_parquet_files(self.table_path)&lt;BR /&gt;&lt;BR /&gt;for file_path in parquet_files:&lt;BR /&gt;df = spark.read.parquet(file_path)&lt;BR /&gt;&lt;BR /&gt;# Build zonemap&lt;BR /&gt;self.zonemap[file_path] = {&lt;BR /&gt;col: {&lt;BR /&gt;'min': df.agg(F.min(col)). collect()[0][0],&lt;BR /&gt;'max': df.agg(F.max(col)). collect()[0][0],&lt;BR /&gt;'nulls': df.filter(F.col(col). isNull()). count(),&lt;BR /&gt;'count': df.count()&lt;BR /&gt;}&lt;BR /&gt;for col in df.columns}&lt;BR /&gt;&lt;BR /&gt;# Build Bloom filter for high-cardinality columns&lt;BR /&gt;for col in self._identify_high_cardinality_cols(df):&lt;BR /&gt;if file_path is not in self. bloom_filters:&lt;BR /&gt;self.bloom_filters[file_path] = {}&lt;BR /&gt;&lt;BR /&gt;bloom = AdaptiveBloomFilter(&lt;BR /&gt;expected_elements=df.count(),&lt;BR /&gt;false_positive_rate=0.001&lt;BR /&gt;)&lt;BR /&gt;bloom.add_batch(df, col)&lt;BR /&gt;self.bloom_filters [file_path] [col] = bloom&lt;BR /&gt;&lt;BR /&gt;# Collect statistics&lt;BR /&gt;self.stats[file_path] = {&lt;BR /&gt;'row_count': df.count(),&lt;BR /&gt;'size_bytes': get_file_size(file_path),&lt;BR /&gt;'columns': df.columns,&lt;BR /&gt;'compression_ratio': calculate_compression_ratio(file_path)&lt;BR /&gt;}&lt;BR /&gt;&lt;BR /&gt;def prune_files(self, predicate):&lt;BR /&gt;"""&lt;BR /&gt;Uses both indexes to eliminate files&lt;BR /&gt;"""&lt;BR /&gt;candidate_files = []&lt;BR /&gt;&lt;BR /&gt;for file_path in self.zonemap.keys():&lt;BR /&gt;# Check zonemap first (cheap)&lt;BR /&gt;if self._zonemap_matches(file_path, predicate):&lt;BR /&gt;# Then check Bloom filter (if applicable)&lt;BR /&gt;if self._bloom_filter_matches(file_path, predicate):&lt;BR /&gt;candidate_files.append(file_path)&lt;BR /&gt;&lt;BR /&gt;pruning_rate = 1 - (len(candidate_files) / len(self.zonemap))&lt;BR /&gt;print(f"&lt;span class="lia-unicode-emoji" title=":sparkles:"&gt;✨&lt;/span&gt; Pruned {pruning_rate:.1%} of files!")&lt;BR /&gt;&lt;BR /&gt;return candidate_files&lt;BR /&gt;3. Query Optimizer Integration&lt;/P&gt;&lt;P&gt;def optimize_spark_query(query_plan, hybrid_index):&lt;BR /&gt;"""&lt;BR /&gt;Rewrites Spark query plan to use hybrid index&lt;BR /&gt;"""&lt;BR /&gt;# Extract predicates from query&lt;BR /&gt;predicates = extract_predicates_from_plan(query_plan)&lt;BR /&gt;&lt;BR /&gt;# Get pruned file list&lt;BR /&gt;valid_files = hybrid_index.prune_files(predicates)&lt;BR /&gt;&lt;BR /&gt;# Inject file filter into query plan&lt;BR /&gt;optimized_plan = query_plan.transform(&lt;BR /&gt;lambda node: inject_file_filter(node, valid_files)&lt;BR /&gt;if isinstance(node, FileScan) else node&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;return optimized_plan&lt;/P&gt;&lt;P&gt;# Hook into Spark optimizer&lt;BR /&gt;spark.conf.set(&lt;BR /&gt;"spark.sql.adaptive.optimizer.extraOptimizers",&lt;BR /&gt;"com.mycompany.HybridIndexOptimizer")&lt;/P&gt;&lt;H2&gt;Benchmark Results That Blew My Mind&lt;/H2&gt;&lt;H3&gt;Test: 1B rows, 500GB Parquet files&lt;/H3&gt;&lt;P&gt;Query Type No Index Zonemap Only Zonemap + Bloom Improvement&lt;/P&gt;&lt;TABLE&gt;&lt;TBODY&gt;&lt;TR&gt;&lt;TD&gt;Range Query&lt;/TD&gt;&lt;TD&gt;45.2s&lt;/TD&gt;&lt;TD&gt;8.3s&lt;/TD&gt;&lt;TD&gt;7.9s&lt;/TD&gt;&lt;TD&gt;82.5%&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;Exact Match&lt;/TD&gt;&lt;TD&gt;43.7s&lt;/TD&gt;&lt;TD&gt;41.2s&lt;/TD&gt;&lt;TD&gt;3.1s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;92.9%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;IN Clause (10 values)&lt;/TD&gt;&lt;TD&gt;44.5s&lt;/TD&gt;&lt;TD&gt;42.8s&lt;/TD&gt;&lt;TD&gt;4.7s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;89.4%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;TR&gt;&lt;TD&gt;JOIN on high-cardinality&lt;/TD&gt;&lt;TD&gt;126.3s&lt;/TD&gt;&lt;TD&gt;119.7s&lt;/TD&gt;&lt;TD&gt;14.2s&lt;/TD&gt;&lt;TD&gt;&lt;STRONG&gt;88.8%&lt;/STRONG&gt;&lt;/TD&gt;&lt;/TR&gt;&lt;/TBODY&gt;&lt;/TABLE&gt;&lt;H2&gt;Memory Overhead Analysis&lt;/H2&gt;&lt;P class=""&gt;Concerned about memory? Here's what I found:&lt;/P&gt;&lt;P&gt;def calculate_index_memory(table_stats):&lt;BR /&gt;"""&lt;BR /&gt;Estimates memory footprint&lt;BR /&gt;"""&lt;BR /&gt;zonemap_size = table_stats['num_files'] * \&lt;BR /&gt;table_stats['num_columns'] * \&lt;BR /&gt;32 # bytes per min/max entry&lt;BR /&gt;&lt;BR /&gt;bloom_size = table_stats['num_files'] * \&lt;BR /&gt;table_stats['high_card_columns'] * \&lt;BR /&gt;(table_stats['distinct_values'] * 1.44 / &lt;span class="lia-unicode-emoji" title=":smiling_face_with_sunglasses:"&gt;😎&lt;/span&gt; # bits to bytes&lt;BR /&gt;&lt;BR /&gt;total_mb = (zonemap_size + bloom_size) / 1048576&lt;BR /&gt;&lt;BR /&gt;return {&lt;BR /&gt;'zonemap_mb': zonemap_size / 1048576,&lt;BR /&gt;'bloom_mb': bloom_size / 1048576,&lt;BR /&gt;'total_mb': total_mb,&lt;BR /&gt;'percentage_of_data': (total_mb / table_stats['data_size_mb']) * 100&lt;BR /&gt;}&lt;/P&gt;&lt;P&gt;# Real example from production:&lt;BR /&gt;# 500GB table → 47MB index (0.009% overhead!)&lt;/P&gt;&lt;H2&gt;Gotchas and Solutions&lt;/H2&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 1:&lt;/STRONG&gt; Bloom filter size explosion with updates &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Incremental Bloom filter chains&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 2:&lt;/STRONG&gt; False positive accumulation &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Periodic rebuild based on query patterns&lt;/P&gt;&lt;P class=""&gt;&lt;STRONG&gt;Problem 3:&lt;/STRONG&gt; Serialization overhead &lt;STRONG&gt;Solution:&lt;/STRONG&gt; Binary format with Kryo serialization&lt;/P&gt;&lt;H2&gt;Production Deployment Tips&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;&lt;STRONG&gt;Start with read-heavy tables&lt;/STRONG&gt;—best ROI&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Monitor false positive rates&lt;/STRONG&gt;—adjust accordingly&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Use async index updates&lt;/STRONG&gt;—don't block writes&lt;/LI&gt;&lt;LI&gt;&lt;STRONG&gt;Cache indexes in Redis&lt;/STRONG&gt;—shared across clusters&lt;/LI&gt;&lt;/OL&gt;&lt;H2&gt;Next Evolution: ML-Driven Index Selection&lt;/H2&gt;&lt;P class=""&gt;Working on using query logs to automatically determine:&lt;/P&gt;&lt;UL class=""&gt;&lt;LI&gt;Which columns need Bloom filters&lt;/LI&gt;&lt;LI&gt;Optimal false positive rates&lt;/LI&gt;&lt;LI&gt;When to rebuild indexes&lt;/LI&gt;&lt;/UL&gt;&lt;H2&gt;Your Turn!&lt;/H2&gt;&lt;OL class=""&gt;&lt;LI&gt;What's your experience with Bloom filters in Spark?&lt;/LI&gt;&lt;LI&gt;Has anyone tried HyperLogLog for cardinality estimation?&lt;/LI&gt;&lt;LI&gt;Interested in a Databricks notebook with full implementation?&lt;/LI&gt;&lt;/OL&gt;&lt;P class=""&gt;&lt;STRONG&gt;GitHub Repo:&lt;/STRONG&gt; [Coming this weekend with full code]&lt;/P&gt;&lt;P class=""&gt;Let's push the boundaries of what's possible! &lt;span class="lia-unicode-emoji" title=":rocket:"&gt;🚀&lt;/span&gt;&lt;/P&gt;&lt;P class=""&gt;#BloomFilter #AdvancedOptimization #Spark #DataEngineering #Performance&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 29 Aug 2025 14:28:00 GMT</pubDate>
      <guid>https://community.databricks.com/t5/community-articles/technical-deep-dive/m-p/130171#M655</guid>
      <dc:creator>ck7007</dc:creator>
      <dc:date>2025-08-29T14:28:00Z</dc:date>
    </item>
  </channel>
</rss>

