Your problem statement is spot-on. The metadata contention issue is exactly what I encountered in production. Let's brainstorm solutions - I've tested several approaches:

Solution Architecture Options

1. Hierarchical Zonemap with Time-Based Partitioning

class HierarchicalZonemap:
"""
Three-tier architecture: micro -> hourly -> daily
"""
def __init__(self):
self.micro_zonemaps = {} ## Per micro-batc
self.hourly_zonemaps = {} # Consolidated hourly
self.daily_master = {} # Final master

def write_micro_batch(self, batch_id, stats):
# Write to isolated micro zonemap (no contention)
timestamp_bucket = get_hour_bucket(batch_id)
self.micro_zonemaps[timestamp_bucket].append(stats)

# Async consolidation every hour
if should_consolidate(timestamp_bucket):
self.async_consolidate_hour(timestamp_bucket)

Pros: Zero write contention, predictable consolidation

Cons: Query complexity increases, 3-tier lookup overhead

2. Dual-Index Pattern (My Current Approach)

class DualIndexStrategy:
def __init__(self):
self.hot_index = {} # Last N hours (in-memory)
self.cold_index = {} # Historical (disk-based)

def query_routing(self, time_predicate):
if is_recent(time_predicate):
return self.hot_index.query() # Fast path
return self.cold_index.query() # Standard path

Results from production:

  • Hot queries: 1.2s (vs 23s baseline)
  • Cold queries: 2.8s (acceptable)
  • Write throughput: 43K records/sec maintained

    3. Probabilistic Merge Strategy

    def should_merge_zonemap(batch_id, file_count):
    """
    Probabilistically decide when to merge based on load
    """
    merge_probability = min(1.0, file_count / 1000)
    if random.random() < merge_probability:
    return True

    # Force merge at boundaries
    return batch_id % 100 == 0

    This reduces contention by spreading merge operations randomly.

    Critical Insight I Discovered

    The real bottleneck isn't the zonemap update itself - it's the snapshot creation in Iceberg. Every micro-batch creates a new snapshot, and updating the zonemap reference in each snapshot causes lock contention.

    Solution: Decouple Zonemap from Snapshots

    class DecoupledZonemap:
    def __init__(self, table_path):
    # Store zonemaps separately from Iceberg metadata
    self.zonemap_path = f"{table_path}/_zonemaps_v2/"
    self.snapshot_mapping = {} # Maps snapshot -> zonemap version

    def update(self, batch_data, snapshot_id):
    # Write zonemap independently
    zonemap_version = self.write_zonemap(batch_data)

    # Async update mapping (no Iceberg lock needed)
    self.snapshot_mapping [snapshot_id] = zonemap_version

     

    Performance Comparison

     

    Strategy Write Impact Query Performance Complexity
    Master Update Every Batch-67% throughputBest (2.1s)LowHierarchical-4% throughputGood (3.2s)HighDual-Index-5% throughputExcellent (1.2s recent)MediumDecoupled-2% throughputGood (2.8s)Medium
     

    My Recommendation

    Combine approaches 2 and 3:

    1. Use dual-index for time-based optimization
    2. Probabilistic merging for background consolidation
    3. Keep zonemaps separate from Iceberg metadata

      Open Questions for Collaboration

      1. Compaction Strategy: How do you handle zonemap updates during file compaction? I'm seeing zonemap drift after OPTIMIZE operations.
      2. Schema Evolution: When columns are added/dropped, rebuilding all zonemaps is expensive. Thoughts on incremental schema updates?
      3. Memory Management: My hot index grows to ~2GB after 24 hours. Are you using off-heap memory or spillable data structures?

        Want to set up a shared repo to test these approaches? I can contribute my dual-index implementation and benchmark harness. What specific streaming workload characteristics are you optimizing for?