cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Governance
Join discussions on data governance practices, compliance, and security within the Databricks Community. Exchange strategies and insights to ensure data integrity and regulatory compliance.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Achieved 87% Query Performance Improvement with Custom Zonemap Indexing

ck7007
New Contributor II

Problem: Queries on our 100M+ record Iceberg tables were taking 45+ seconds.

Solution: Implemented lightweight zonemap indexing that tracks min/max values per file.

Quick Implementation

def apply_zonemap_pruning(table_path, predicate_value):
# Load zonemap index
zonemap = spark.read.parquet(f"{table_path}/_zonemaps")

# Filter files based on min/max values
relevant_files = zonemap.filter(
(zonemap.min_value <= predicate_value) &
(zonemap.max_value >= predicate_value)
).select("file_path").collect()

# Read only relevant files instead of a full table scan
return spark.read.parquet(*[f.file_path for f in relevant_files])
Results

Results

  • Before: 42.3 seconds (scanning 1000 files)
  • After: 5.4 seconds (scanning 12 files)
  • Cost savings: 87% reduction in compute

Key insight: Most queries only need 1-2% of files. Zonemap helps identify them instantly.

We are currently adding Bloom filters for even better performance. Has anyone tried similar indexing strategies?

Would love to hear your approaches!

1 ACCEPTED SOLUTION

Accepted Solutions

WiliamRosa
New Contributor II

Hi @ck7007,
Thatโ€™s a great optimization! You can also extend zonemap pruning to multiple predicates. For example, combine a date range with a categorical filter:

# Expanded example: range + extra column
relevant_files = zonemap.filter(
(zonemap.min_date <= query_end) &
(zonemap.max_date >= query_start) &
(zonemap.region == query_region)
).select("file_path").collect()

Only files overlapping the date range and matching the region are readโ€”making pruning even more selective.

On top of that, Bloom Filters nicely complement zonemaps for point lookups (exact keys). Below is a minimal, file-level Bloom workflow:

1) Build a Bloom index per file

from pyspark.sql.functions import col, lit
from pyspark.sql import Row

# pseudo helper: create_bloom(rows, column) -> returns a (file_path, bloom_bytes) iterator
# You can implement this with a library (e.g., pybloom) or a JVM Bloom impl via UDF/Scala.

def build_bloom_index(table_path, column="user_id"):
# Read the Iceberg table (or Parquet paths) so you can group by file
df = spark.read.format("iceberg").load(table_path)

# Make sure df has a column with the file path; if not, produce it from metadata
# Many engines expose input_file_name(); if needed:
df_with_file = df.withColumn("_file_path", spark.functions.input_file_name())

# Build one Bloom filter per file
bloom_index_rdd = (
df_with_file
.select("_file_path", column)
.rdd
.groupBy(lambda r: r["_file_path"]) # group rows by file
.map(lambda kv: (
kv[0], # file_path
create_bloom((row[column] for row in kv[1]), column) # your custom bloom builder -> bytes
))
.map(lambda t: Row(file_path=t[0], bloom_bytes=t[1]))
)

spark.createDataFrame(bloom_index_rdd) \
.write.mode("overwrite").parquet(f"{table_path}/_bloom")


2) Use the Bloom index for pruning

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# pseudo helper: might_contain(bloom_bytes, key) -> bool
@udf(BooleanType())
def bloom_might_contain(bloom_bytes, key):
return bloom_lookup(bloom_bytes, key) # implement based on your bloom format

def apply_bloom_pruning(table_path, lookup_id):
# Load bloom index
bloom = spark.read.parquet(f"{table_path}/_bloom") # columns: file_path, bloom_bytes

# Keep only files that MIGHT contain the key (may include false positives, never false negatives)
candidate_files = (bloom
.filter(bloom_might_contain(bloom["bloom_bytes"], lit(lookup_id)))
.select("file_path")
.collect()
)

# Read only those candidate files
return spark.read.parquet(*[r.file_path for r in candidate_files])

Why both?
Zonemap โ†’ best for ranges (e.g., date/time, monotonically increasing IDs).
Bloom Filter โ†’ best for point lookups (e.g., user_id = 123456).

Used together, they drastically shrink the scan setโ€”zonemap narrows by ranges, Bloom narrows by exact keys.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

View solution in original post

4 REPLIES 4

WiliamRosa
New Contributor II

Hi @ck7007,
Thatโ€™s a great optimization! You can also extend zonemap pruning to multiple predicates. For example, combine a date range with a categorical filter:

# Expanded example: range + extra column
relevant_files = zonemap.filter(
(zonemap.min_date <= query_end) &
(zonemap.max_date >= query_start) &
(zonemap.region == query_region)
).select("file_path").collect()

Only files overlapping the date range and matching the region are readโ€”making pruning even more selective.

On top of that, Bloom Filters nicely complement zonemaps for point lookups (exact keys). Below is a minimal, file-level Bloom workflow:

1) Build a Bloom index per file

from pyspark.sql.functions import col, lit
from pyspark.sql import Row

# pseudo helper: create_bloom(rows, column) -> returns a (file_path, bloom_bytes) iterator
# You can implement this with a library (e.g., pybloom) or a JVM Bloom impl via UDF/Scala.

def build_bloom_index(table_path, column="user_id"):
# Read the Iceberg table (or Parquet paths) so you can group by file
df = spark.read.format("iceberg").load(table_path)

# Make sure df has a column with the file path; if not, produce it from metadata
# Many engines expose input_file_name(); if needed:
df_with_file = df.withColumn("_file_path", spark.functions.input_file_name())

# Build one Bloom filter per file
bloom_index_rdd = (
df_with_file
.select("_file_path", column)
.rdd
.groupBy(lambda r: r["_file_path"]) # group rows by file
.map(lambda kv: (
kv[0], # file_path
create_bloom((row[column] for row in kv[1]), column) # your custom bloom builder -> bytes
))
.map(lambda t: Row(file_path=t[0], bloom_bytes=t[1]))
)

spark.createDataFrame(bloom_index_rdd) \
.write.mode("overwrite").parquet(f"{table_path}/_bloom")


2) Use the Bloom index for pruning

from pyspark.sql.functions import udf
from pyspark.sql.types import BooleanType

# pseudo helper: might_contain(bloom_bytes, key) -> bool
@udf(BooleanType())
def bloom_might_contain(bloom_bytes, key):
return bloom_lookup(bloom_bytes, key) # implement based on your bloom format

def apply_bloom_pruning(table_path, lookup_id):
# Load bloom index
bloom = spark.read.parquet(f"{table_path}/_bloom") # columns: file_path, bloom_bytes

# Keep only files that MIGHT contain the key (may include false positives, never false negatives)
candidate_files = (bloom
.filter(bloom_might_contain(bloom["bloom_bytes"], lit(lookup_id)))
.select("file_path")
.collect()
)

# Read only those candidate files
return spark.read.parquet(*[r.file_path for r in candidate_files])

Why both?
Zonemap โ†’ best for ranges (e.g., date/time, monotonically increasing IDs).
Bloom Filter โ†’ best for point lookups (e.g., user_id = 123456).

Used together, they drastically shrink the scan setโ€”zonemap narrows by ranges, Bloom narrows by exact keys.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

WiliamRosa
New Contributor II

tks for sharing @ck7007 

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

ck7007
New Contributor II

Thanks, glad you found it helpful.

Your detailed Bloom filter implementation above is fantasticโ€”especially the RDD-based grouping approach for file-level filters. Have you measured the memory overhead in production?

I'm curious about your experience with false positive rates. We're currently at 0.01 but considering 0.001 for our high-value queries. What's worked best for you?

Looking forward to more technical exchanges

Isi
Honored Contributor II

Hi! @ck7007 @WiliamRosa ,

I have a question โ€” why are you actually doing this? Iโ€™m not fully familiar with your exact setup (Iceberg), but my understanding is that Iceberg already stores these stats (min/max) in the manifests, and Spark should be able to leverage them to skip files that donโ€™t contain relevant data.

 

Could you share a bit more detail? For example:

  • Which engine version are you using?

  • Did you check the Spark physical plan to verify whether the manifest pruning is being applied correctly?

  • Is it possible that the query or engine isnโ€™t pushing down the filter as expected?

 

As I understand it, the manifest tree should already allow you to only read the necessary files without doing a full scan. So Iโ€™m not sure if this is an engine limitation, a query issue, or something else Iโ€™m missing.

If you could provide more context, that would be really valuable for everyone here ๐Ÿ™‚ Thanks!

Isi