2 weeks ago
Problem: Queries on our 100M+ record Iceberg tables were taking 45+ seconds.
Solution: Implemented lightweight zonemap indexing that tracks min/max values per file.
Quick Implementation
def apply_zonemap_pruning(table_path, predicate_value):
# Load zonemap index
zonemap = spark.read.parquet(f"{table_path}/_zonemaps")
# Filter files based on min/max values
relevant_files = zonemap.filter(
(zonemap.min_value <= predicate_value) &
(zonemap.max_value >= predicate_value)
).select("file_path").collect()
# Read only relevant files instead of a full table scan
return spark.read.parquet(*[f.file_path for f in relevant_files])
Results
Key insight: Most queries only need 1-2% of files. Zonemap helps identify them instantly.
We are currently adding Bloom filters for even better performance. Has anyone tried similar indexing strategies?
Would love to hear your approaches!
2 weeks ago
Hi @ck7007,
Thatโs a great optimization! You can also extend zonemap pruning to multiple predicates. For example, combine a date range with a categorical filter:
# Expanded example: range + extra column
relevant_files = zonemap.filter(
(zonemap.min_date <= query_end) &
(zonemap.max_date >= query_start) &
(zonemap.region == query_region)
).select("file_path").collect()
Only files overlapping the date range and matching the region are readโmaking pruning even more selective.
On top of that, Bloom Filters nicely complement zonemaps for point lookups (exact keys). Below is a minimal, file-level Bloom workflow:
1) Build a Bloom index per file
from pyspark.sql.functions import col, lit
from pyspark.sql import Row
# pseudo helper: create_bloom(rows, column) -> returns a (file_path, bloom_bytes) iterator
# You can implement this with a library (e.g., pybloom) or a JVM Bloom impl via UDF/Scala.
def build_bloom_index(table_path, column="user_id"):
# Read the Iceberg table (or Parquet paths) so you can group by file
df = spark.read.format("iceberg").load(table_path)
# Make sure df has a column with the file path; if not, produce it from metadata
# Many engines expose input_file_name(); if needed:
df_with_file = df.withColumn("_file_path", spark.functions.input_file_name())
# Build one Bloom filter per file
bloom_index_rdd = (
df_with_file
.select("_file_path", column)
.rdd
.groupBy(lambda r: r["_file_path"]) # group rows by file
.map(lambda kv: (
kv[0], # file_path
create_bloom((row[column] for row in kv[1]), column) # your custom bloom builder -> bytes
))
.map(lambda t: Row(file_path=t[0], bloom_bytes=t[1]))
)
spark.createDataFrame(bloom_index_rdd) \
.write.mode("overwrite").parquet(f"{table_path}/_bloom")
2) Use the Bloom index for pruning
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# pseudo helper: might_contain(bloom_bytes, key) -> bool
@udf(BooleanType())
def bloom_might_contain(bloom_bytes, key):
return bloom_lookup(bloom_bytes, key) # implement based on your bloom format
def apply_bloom_pruning(table_path, lookup_id):
# Load bloom index
bloom = spark.read.parquet(f"{table_path}/_bloom") # columns: file_path, bloom_bytes
# Keep only files that MIGHT contain the key (may include false positives, never false negatives)
candidate_files = (bloom
.filter(bloom_might_contain(bloom["bloom_bytes"], lit(lookup_id)))
.select("file_path")
.collect()
)
# Read only those candidate files
return spark.read.parquet(*[r.file_path for r in candidate_files])
Why both?
Zonemap โ best for ranges (e.g., date/time, monotonically increasing IDs).
Bloom Filter โ best for point lookups (e.g., user_id = 123456).
Used together, they drastically shrink the scan setโzonemap narrows by ranges, Bloom narrows by exact keys.
2 weeks ago
Hi @ck7007,
Thatโs a great optimization! You can also extend zonemap pruning to multiple predicates. For example, combine a date range with a categorical filter:
# Expanded example: range + extra column
relevant_files = zonemap.filter(
(zonemap.min_date <= query_end) &
(zonemap.max_date >= query_start) &
(zonemap.region == query_region)
).select("file_path").collect()
Only files overlapping the date range and matching the region are readโmaking pruning even more selective.
On top of that, Bloom Filters nicely complement zonemaps for point lookups (exact keys). Below is a minimal, file-level Bloom workflow:
1) Build a Bloom index per file
from pyspark.sql.functions import col, lit
from pyspark.sql import Row
# pseudo helper: create_bloom(rows, column) -> returns a (file_path, bloom_bytes) iterator
# You can implement this with a library (e.g., pybloom) or a JVM Bloom impl via UDF/Scala.
def build_bloom_index(table_path, column="user_id"):
# Read the Iceberg table (or Parquet paths) so you can group by file
df = spark.read.format("iceberg").load(table_path)
# Make sure df has a column with the file path; if not, produce it from metadata
# Many engines expose input_file_name(); if needed:
df_with_file = df.withColumn("_file_path", spark.functions.input_file_name())
# Build one Bloom filter per file
bloom_index_rdd = (
df_with_file
.select("_file_path", column)
.rdd
.groupBy(lambda r: r["_file_path"]) # group rows by file
.map(lambda kv: (
kv[0], # file_path
create_bloom((row[column] for row in kv[1]), column) # your custom bloom builder -> bytes
))
.map(lambda t: Row(file_path=t[0], bloom_bytes=t[1]))
)
spark.createDataFrame(bloom_index_rdd) \
.write.mode("overwrite").parquet(f"{table_path}/_bloom")
2) Use the Bloom index for pruning
from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType
# pseudo helper: might_contain(bloom_bytes, key) -> bool
@udf(BooleanType())
def bloom_might_contain(bloom_bytes, key):
return bloom_lookup(bloom_bytes, key) # implement based on your bloom format
def apply_bloom_pruning(table_path, lookup_id):
# Load bloom index
bloom = spark.read.parquet(f"{table_path}/_bloom") # columns: file_path, bloom_bytes
# Keep only files that MIGHT contain the key (may include false positives, never false negatives)
candidate_files = (bloom
.filter(bloom_might_contain(bloom["bloom_bytes"], lit(lookup_id)))
.select("file_path")
.collect()
)
# Read only those candidate files
return spark.read.parquet(*[r.file_path for r in candidate_files])
Why both?
Zonemap โ best for ranges (e.g., date/time, monotonically increasing IDs).
Bloom Filter โ best for point lookups (e.g., user_id = 123456).
Used together, they drastically shrink the scan setโzonemap narrows by ranges, Bloom narrows by exact keys.
2 weeks ago
tks for sharing @ck7007
2 weeks ago
Thanks, glad you found it helpful.
Your detailed Bloom filter implementation above is fantasticโespecially the RDD-based grouping approach for file-level filters. Have you measured the memory overhead in production?
I'm curious about your experience with false positive rates. We're currently at 0.01 but considering 0.001 for our high-value queries. What's worked best for you?
Looking forward to more technical exchanges
yesterday
Hi! @ck7007 @WiliamRosa ,
I have a question โ why are you actually doing this? Iโm not fully familiar with your exact setup (Iceberg), but my understanding is that Iceberg already stores these stats (min/max) in the manifests, and Spark should be able to leverage them to skip files that donโt contain relevant data.
Could you share a bit more detail? For example:
Which engine version are you using?
Did you check the Spark physical plan to verify whether the manifest pruning is being applied correctly?
Is it possible that the query or engine isnโt pushing down the filter as expected?
As I understand it, the manifest tree should already allow you to only read the necessary files without doing a full scan. So Iโm not sure if this is an engine limitation, a query issue, or something else Iโm missing.
If you could provide more context, that would be really valuable for everyone here ๐ Thanks!
Isi
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now