cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
uday_satapathy
Databricks Employee
Databricks Employee

Summary

  • In production machine learning systems (like real-time recommenders or fraud detection), models operate under a strict latency budget (typically milliseconds end-to-end). Within this window, the model must fetch features, run inference, and return a decision.
  • Traditional analytical engines are optimized for high-throughput columnar scans, making them poorly suited for sub-millisecond point lookups. Conversely, maintaining separate, custom-built operational databases for online serving introduces massive engineering overhead and risks online-offline feature skew—where the features served at inference differ from those used during training.
  • This blog highights Lakebase as the low-latency online serving layer for the Databricks Online Feature Store. It details how to bridge the gap between offline training data in Unity Catalog and online models using three core ingestion and serving patterns, mapping them to a "Freshness Ladder" (from days-fresh to sub-second).

 

Databricks Lakebase is a serverless Postgres database built directly into the Databricks Data Intelligence Platform. Beyond running standard transactional apps, it powers the Databricks Online Feature Store, acting as the low-latency serving layer that holds machine learning features for your production models. This integration links your offline training data directly to your online models without the operational headache of managing separate database systems.

To populate the store, you just sync your offline feature tables from Unity Catalog. Databricks replicates the data to Lakebase automatically. You can set the sync to run on a schedule or stream updates continuously, which saves you from writing and maintaining custom data pipelines.

Recommender Systems and Feature Stores

When a recommender system is serving a live shopper, it has roughly two hundred milliseconds to fetch features, run inference, and return a decision. In that window, the model must look up who the user is, what they just clicked, and what's trending right now, then combine those signals into a ranked result. If any of those inputs i.e. the user's profile, the item catalog, the session counters, lag behind reality, the model ranks against a world that no longer exists. And if the lookups themselves are slow, so is the page.

Getting fresh, governed feature values into the scoring call without exceeding that budget is an engineering challenge. This blog covers how to solve it using Lakebase, the managed Postgres database in the Databricks Data Intelligence Platform.

Once populated, Lakebase supports multiple ingestion patterns for writing features and multiple read patterns for serving them. The right combination depends on how fresh your data needs to be and how your scoring endpoint is built. We'll walk through three patterns, each making a different trade-off between operational governance, complexity, and latency. In practice, real recommenders, fraud systems, and personalization pipelines rarely commit to just one, they mix and match across all three, feature by feature.

uday_satapathy_5-1782225122173.png

Figure 1. Inference flow with a ~200 ms end-to-end latency budget, showing where the Lakebase feature lookup sits.

The Case for an Online Feature Store

It's worth being explicit about why a feature store sits in the stack at all before getting into how Lakebase serves one. A feature store gives teams a governed place to define a feature once and reuse it across projects, with versioning, access control, and lineage handled in Unity Catalog rather than reinvented per model. It also means each feature is computed ahead of time and looked up at inference, so the scoring path reads a stored value instead of recomputing aggregates on every request. The property that matters most for a live model is consistency: the values served at inference come from the same definitions that produced the training data, which prevents the online-offline skew that slowly erodes model quality.

Most teams already store features in Unity Catalog as Delta tables e.g. country, lifetime spend, a daily popularity score per item. These features are then used for training, analytics, and reporting. That's the offline feature store: governed, versioned, and usually queried by a SQL warehouse or a Spark job that scans columns across millions of rows. Those compute engines, optimized to run analytical workloads, excel at heavy columnar scans, but struggle with the point lookup ("give me the feature row for a user with userid 12345 right now") that a scoring endpoint makes once per request. The engine optimizes for throughput, while a sub-millisecond key lookup requires a different database shape.

Lakebase is the online feature store of the Databricks Data Intelligence Platform, designed specifically for low-latency feature serving at inference time. The FeatureEngineeringClient in the databricks-feature-engineering library exposes a publishing API that continuously syncs a Unity Catalog Delta feature table to Lakebase through a managed streaming pipeline that Databricks runs on your behalf.

The Freshness Ladder

A recommender system draws on two broad classes of features, split by how fast they change.

  1. Facts that change slowly e.g. a user's country, account age, or preferred genre; an item's category, price tier, average rating, or baseline popularity. These describe who someone is or what sits in the catalog, and they stay valid for hours or days.
  2. Signals that change by the second e.g. items added in the current session, last action timestamp, click velocity, or what's trending across the last few minutes. These describe what is happening right now, and lose their meaning almost as fast as they are created.

The first class tolerates features that are hours or even a day old. The second loses its value the moment it falls more than a few seconds behind.

Freshness is not free. A daily refresh is a single batch job. A per-minute or frequent refresh requires a streaming pipeline running continuously. Per-event state in near-real-time adds a custom streaming writer and a Lakebase instance sized for sustained write throughput. The engineering decision for every feature is the same: weigh the cost of keeping it fresh against the performance cost of letting it lag. 

Three patterns cover most of what teams build on Lakebase.

  • Managed Publish: Take a governed Delta feature table, point fe.publish_table at it, and Databricks runs a managed sync into Lakebase on your behalf. This pattern is the natural fit for batch features and minute-fresh signals where the source of truth already lives in Unity Catalog.
  • Declarative Window: Define windowed aggregates once e.g. events in the last hour, distinct items in the last day, and Databricks handles both the offline materialization in Unity Catalog and the Lakebase sync from that single definition.The same declarative definition extends to a Kafka source through Streaming Declarative Features (Private Preview), which pushes the managed path to sub-second freshness. Note that RollingWindow aggregates over a Delta source are computed on-the-fly at training time for point-in-time correctness, so no serving copy is written to Lakebase. 
  • Direct Stream: Run your own streaming job that writes seconds-fresh, per-user state directly to Lakebase, either through the native PostgreSQL Sink or via foreach writer sink. This pattern is the right choice for a hot operational state where freshness is measured in seconds, not minutes.

uday_satapathy_6-1782225122176.png

Figure 2. The freshness ladder: feature freshness ranging from days-fresh to sub-second, each rung mapped to its serving pattern on Lakebase.

  • Days or hours fresh:  User demographics, item catalog metadata, precomputed embeddings. These are inexpensive to refresh on a schedule and typically go through Managed Publish on a scheduled cadence, or Declarative Window with a coarse cron.
  • Minutes fresh:  Hourly item popularity, last-day aggregate signals. Pre-built feature tables use Managed Publish in continuous mode; windowed aggregates use Declarative Window with materialize_features(online_config=...) on a tight cron.
  • Seconds fresh:  Session counters, last-event state, per-user velocity signals. These go through Direct Stream.
  • Sub-second fresh:  Live windowed counts that feed model scoring the instant they change. On the managed side, this is achievable today via Declarative Window's Streaming Declarative Features on Kafka. On the custom-code side, Direct Stream's native PostgreSQL Sink combined with Real-Time Mode on DBR 18.0+ gets you there.

No single pattern satisfies governance, sub-second freshness, and low operational overhead at once. Production pipelines mix and match across all four rungs of the ladder, choosing the right pattern feature by feature.

Managed Publish: offline features to the Online Feature Store

Managed Publish covers most common use cases. Any feature your team would put in a Unity Catalog Delta table (and most should) can be published to Lakebase with fe.publish_table. You get UC lineage, access control, time-travel, and a managed sync pipeline. You pick the freshness with the publish_mode argument.

from databricks.feature_engineering import FeatureEngineeringClient

fe = FeatureEngineeringClient()
online_store = fe.get_online_store(name="my-online-store")

fe.publish_table(
    online_store=online_store,
    source_table_name="main.rec.user_profile_features",
    online_table_name="main.rec.user_profile_features_online",
    publish_mode="CONTINUOUS",
)

Three modes:

  • TRIGGERED: incremental sync on demand or on a schedule, with minute-to-hour cadence and the lowest cost.
  • CONTINUOUS: a managed Lakeflow streaming pipeline tails Change Data Feed on the source and upserts to Lakebase continuously, providing minute-fresh reads.
  • SNAPSHOT: full rebuild each run, suitable for small lookup tables.

Pick the mode by freshness requirement: TRIGGERED for a daily-refreshed user profile, CONTINUOUS for hourly item popularity, SNAPSHOT for a small static lookup.

Declarative Window: aggregates that train and serve from one definition

Declarative Window is the right pattern when a feature is a windowed aggregate over a raw event source, think "sum of transaction amount per user over the last seven days" or "count of orders per user on a sliding seven-day window."

You declare the aggregate once in Python against a Unity Catalog source table. From that single materialize_features call, Databricks does three things: computes the aggregate, lands it in Unity Catalog as a Delta table for offline use, and syncs it to Lakebase as an online table for serving. The same definition drives both the point-in-time-correct training set and the low-latency lookup at inference time, with no separate publish_table step required.

Declarative Window is in Beta and requires databricks-feature-engineering >= 0.15.0 and DBR 17.0 ML or newer (Serverless is fine). The code below targets the SDK 0.15.0 API shape; earlier shapes used fe.create_feature / ContinuousWindow / entity_columns.

Materializable windows: SlidingWindow and TumblingWindow

SlidingWindow and TumblingWindow are the two windows the platform will materialize for you. The aggregation function, the window, and the source come together on a single Feature, and materialize_features(features=..., offline_config=..., online_config=...) does the rest:

from databricks.feature_engineering import FeatureEngineeringClient
from databricks.feature_engineering.entities import (
    Feature, DeltaTableSource, AggregationFunction,
    Sum, Count,
    SlidingWindow, TumblingWindow,
    OfflineStoreConfig, OnlineStoreConfig, CronSchedule,
)
from databricks.sdk.service.ml import MaterializedFeaturePipelineScheduleState
from datetime import timedelta

fe = FeatureEngineeringClient()
tx_source = DeltaTableSource(
    catalog_name="main", schema_name="rec", table_name="transactions",
)
amount_sum = Feature(
    source=tx_source,
    entity=["user_id"],
    timeseries_column="transaction_time",
    function=AggregationFunction(
        Sum(input="amount"),
        TumblingWindow(window_duration=timedelta(days=7)),
    ),
    name="amount_sum_tumbling_7d",
)

order_count = Feature(
    source=tx_source,
    entity=["user_id"],
    timeseries_column="transaction_time",
    function=AggregationFunction(
        Count(input="transaction_id"),
        SlidingWindow(
            window_duration=timedelta(days=7),
            slide_duration=timedelta(days=1),
        ),
    ),
    name="order_count_sliding_7d_1d",
)

registered = [
    fe.register_feature(feature=f, catalog_name="main", schema_name="rec")
    for f in (amount_sum, order_count)
]

fe.materialize_features(
    features=registered,
    offline_config=OfflineStoreConfig(
        catalog_name="main", schema_name="rec",
        table_name_prefix="declfeat_offline",
    ),
    online_config=OnlineStoreConfig(
        catalog_name="main", schema_name="rec",
        table_name_prefix="declfeat_online",
        online_store_name="my-online-store",
    ),
    trigger=CronSchedule(
        quartz_cron_expression="0 */5 * * * ?",
        timezone_id="UTC",
        pipeline_schedule_state=MaterializedFeaturePipelineScheduleState.ACTIVE,
    ),
)

Databricks automatically sets up a UC Delta table with an "offline" prefix and a synced Lakebase table with an "online" prefix for every registered feature. The refresh frequency is determined by a cron schedule, which effectively sets the floor for data freshness. For example, using a source of 50,000 transaction rows, a single execution of 0 */5 * * * ? completed the offline table writes in roughly two minutes. Notably, sliding-window features generate more rows than tumbling ones because each daily slide creates its own entry. The corresponding Lakebase synced tables, each containing 2,000 rows keyed by user_id, appeared approximately forty seconds later. You can query this online data as standard Postgres using the psycopg library:

Lakebase declfeat_online: 2,000 rows
  cols: ['user_id', 'amount_sum_tumbling_7d']
  row : (0, 690.86)
  row : (1, 271.39)

Streaming features from Kafka

Everything covered so far lands data on a fixed schedule, which works well when the source is a Delta table updated in batch. When the source is a Kafka topic and the model needs the most recent few seconds of activity, the same Declarative API extends to streaming.

Streaming Declarative Features (Private Preview) lets you declare a feature with a KafkaConfig source. Databricks then runs a managed pipeline that reads the topic, computes the aggregate, and writes to Lakebase, with end-to-end freshness typically in the sub-second range. RollingWindow (renamed from ContinuousWindow in SDK 0.15.0) is the window type that fits here, since trailing aggregates over a streaming topic have no batch boundaries. The Feature shape is identical to the Delta path described above; what changes is the source.

Because the same Feature definition drives both training and serving, the windowed aggregate the platform writes to Lakebase matches the training-side math exactly, the same property that makes the Delta path appealing carries over to Kafka.

To try it, get a customer workspace allowlisted through the preview request portal and follow the streaming declarative features docs.

Direct Stream: streaming writes to Lakebase

uday_satapathy_7-1782225122177.png

Figure 3. Three patterns feeding one Lakebase online store. Direct Stream shows the native PostgreSQL Sink alongside the `foreach` control path for per-partition parallel writes and custom merge logic.

Some features are too dynamic for any managed publish pipeline, counts of what a user did in the current session, how fast they are clicking, when their last action happened. Recomputing these from a Delta table on a one-minute cadence is too slow for inference, and their per-user granularity makes batch aggregation across all users wasteful. Direct Stream is built for features like these.

The pattern is straightforward: you write seconds-fresh state directly to Lakebase from a streaming job. Two paths are available.

The PostgreSQL Sink handles row-for-row replication into Lakebase with managed upserts, automatic credential refresh, and deadlock-safe ordering, the right choice when you want Databricks to manage the write mechanics.

foreach gives you full control over the write, with one Lakebase connection per Spark partition and custom upsert SQL running in parallel across executors, the right choice when the write logic is non-trivial or you want the parallelism of executor-side writes.

The two paths handle different shapes of problems, and production pipelines often use both.

The native path: PostgreSQL Sink

Structured Streaming has a native Postgres Sink. Batching, automatic retries, Lakebase credential refresh every microbatch, and deadlock-safe row ordering are all handled under the hood:

(spark.readStream.format("delta").table("main.rec.clickstream_events")
      .writeStream
      .format("postgresql")
      .option("endpoint", "my-lakebase-endpoint")
      .option("database", "databricks_postgres")
      .option("dbtable", "rec.session_state")
      .option("upsertkey", "user_id")
      .option("checkpointLocation", "/Volumes/main/rec/ckpt/direct_stream")
      .option("batchinterval", "50 milliseconds")
      .outputMode("update")
      .trigger(processingTime="5 seconds")
      .start())

Under the hood, the sink issues INSERT ... ON CONFLICT (upsert_key) DO UPDATE SET ..., sorts rows by upsert key within each batch to avoid deadlocks, and refreshes the Lakebase OAuth token every microbatch so long-running queries don't fail on token expiry.

Supported triggers: Real-Time Mode, ProcessingTime, AvailableNow, Once. Real-Time Mode and this sink compose on DBR 18.0+, so the sub-second freshness floor from the ladder is reachable today on dedicated clusters.

The control path: foreach for per-partition parallel writes

The native sink is a row-for-row replicator: stream a DataFrame in, get the same rows out into Lakebase. The moment your write needs more than a single INSERT ... ON CONFLICT DO UPDATE template, or you want the parallelism of executor-side writes, it falls outside the native sink and belongs in foreach. The stateful work, collecting the last ten actions per user, counting distinct products viewed in the last minute, computing per-user velocity, still happens upstream in Structured Streaming. foreach is just the sink that lands those windowed rows.

foreach works best whenever the write needs more than a row-by-row copy:

  • Executor-parallel writes. Each Spark partition opens its own Lakebase connection and writes concurrently. N partitions means N parallel write streams, with no driver-side toLocalIterator() bottleneck.
  • Custom merge semantics. Counter increments, max-of-timestamps, server-side now(). Anything beyond INSERT ... ON CONFLICT DO UPDATE SET col = EXCLUDED.col runs inside process().
  • Buffered flushes. The writer batches rows on the executor and calls executemany once per flush, getting the throughput of batch writes without paying for a driver round-trip.

Here's the shape, drawn from a production Lakebase pipeline that maintains two features per user (the last ten actions, and a count of distinct products viewed) keyed on a one-minute tumbling window. The streaming aggregation does the heavy lifting in Spark:

from pyspark.sql import functions as F
from pyspark.sql.functions import (
    window, collect_list, struct, expr, approx_count_distinct, when,
)

result = (
    events
      .withWatermark("event_ts", "1 minute")
      .groupBy(F.col("user_id"), window(F.col("event_ts"), "1 minute"))
      .agg(
          collect_list(struct("event_ts", "action", "product_id")).alias("actions"),
          approx_count_distinct(
              when(F.col("action") == "view_pdp", F.col("product_id"))
          ).alias("distinct_products_viewed"),
      )
      .select(
          F.col("user_id"),
          F.col("window.start").alias("window_start"),
          F.col("window.end").alias("window_end"),
          expr("""
              transform(
                slice(sort_array(actions, false), 1, 10),
                x -> concat(x.action, ':', x.product_id, ':', cast(x.event_ts as string))
              )
          """).alias("user_last_10_actions"),
          F.col("distinct_products_viewed"),
      )
)

The writer opens one psycopg3 connection per partition, mints a fresh OAuth token via the Databricks SDK, and upserts the windowed rows with INSERT ... ON CONFLICT DO UPDATE:

import psycopg
from databricks.sdk import WorkspaceClient

class LakebaseForeachWriter:
    UPSERT_SQL = """
        INSERT INTO public.user_feature_store
            (user_id, window_start, window_end,
             user_last_10_actions, distinct_products_viewed)
        VALUES (%s, %s, %s, %s, %s)
        ON CONFLICT (user_id, window_start, window_end)
        DO UPDATE SET
            user_last_10_actions     = EXCLUDED.user_last_10_actions,
            distinct_products_viewed = EXCLUDED.distinct_products_viewed,
            updated_at               = now();
    """

    def __init__(self, endpoint_name, db_name, flush_size=500):
        self.endpoint_name = endpoint_name
        self.db_name = db_name
        self.flush_size = flush_size

    def open(self, partition_id, epoch_id):
        w = WorkspaceClient()
        endpoint = w.postgres.get_endpoint(name=self.endpoint_name)
        cred = w.postgres.generate_database_credential(endpoint=self.endpoint_name)
        self.conn = psycopg.connect(
            host=endpoint.status.hosts.host,
            port=5432,
            dbname=self.db_name,
            user=w.current_user.me().user_name,
            password=cred.token,
            sslmode="require",
        )

        self.buffer = []
        return True

    def process(self, row):
        self.buffer.append((
            row["user_id"],
            row["window_start"],
            row["window_end"],
            list(row["user_last_10_actions"]),
            int(row["distinct_products_viewed"]),
        ))

        if len(self.buffer) >= self.flush_size:
            self._flush()

    def close(self, error):
        if error is None and self.buffer:
            self._flush()

        self.conn.close()

    def _flush(self):
        with self.conn.cursor() as cur:
            cur.executemany(self.UPSERT_SQL, self.buffer)

        self.conn.commit()
        self.buffer.clear()

(result.writeStream
       .outputMode("update")
       .foreach(LakebaseForeachWriter(ENDPOINT_NAME, DB_NAME))
       .option("checkpointLocation", CHECKPOINT_LOCATION)
       .trigger(processingTime="15 seconds")
       .start())

A few notes on the writer:

  • The contract for the foreach writer’s class is defined by building a basic Python class with three specific methods: open() – optional, process() – mandatory, and close() – optional
  • Each open() runs on an executor, opening one psycopg3 connection per partition per epoch. There is no driver-side collect. For sustained high-throughput, replace the per-epoch open with a long-lived ConnectionPool per executor.
  • psycopg3's binary protocol handles TEXT[] natively, which matters because user_last_10_actions is an array column.
  • The OAuth token is regenerated on every open() call. Long-running streams don't fail on token expiry.
  • ON CONFLICT DO UPDATE keyed on (user_id, window_start, window_end) makes retries idempotent. Re-running an epoch over the same window produces the same final row.

One final architectural constraint: Training and serving must agree. When using Direct Stream, writes hit Lakebase directly and bypass the offline store, which means the streaming logic and training pipeline can drift. If the training math for "events in the last minute" deviates even slightly from the streaming job, the model encounters a different distribution in production than it did during development. You must pin the logic down in one place. For greater robustness, have your streaming writer append those same feature rows to a Unity Catalog Delta table. This ensures your training sets are composed of the exact values served online, rather than values re-derived after the fact. (This consistency is precisely what Declarative Window automates for SlidingWindow and TumblingWindow shapes; for custom shapes beyond those, you are responsible for maintaining that parity.)

Putting it together: the hybrid recommender

A production recommender typically uses all three patterns simultaneously, assigning each signal to the pattern that best matches its freshness and governance requirements.

uday_satapathy_8-1782225122174.png

Figure 4. Hybrid recommender architecture: user-profile features via Managed Publish, item-popularity windowed aggregates via Declarative Window, session state via Direct Stream, training via the same Declarative Window features plus on-the-fly `RollingWindow` aggregates, all joined by a single Model Serving endpoint.

  • User profile features (country, lifetime events, preferred genre) live in UC Delta and publish to Lakebase via Managed Publish in CONTINUOUS mode, which provides minute-fresh serving.
  • Item popularity (views-last-hour, average rating) can take either of two routes. If a feature table already exists, batch-aggregate it into UC Delta and publish via Managed Publish. If it's defined as a windowed aggregate over the raw interactions table, declare it as a SlidingWindow and let Declarative Window materialize the offline Delta and the Lakebase synced table from one definition. Either way it lands minute-fresh online; pick by whether you want to own the aggregation code or let the platform own it.
  • Session state (events this session, distinct items seen, last event time) lives in a Direct Stream writer keyed on user_id, providing seconds-fresh reads. Because each write aggregates raw events into windowed state before the upsert, this is a foreach case.
  • Training sets use Managed Publish features via FeatureLookup, materialized Declarative Window aggregates (SlidingWindow and TumblingWindow) for the windowed signals that also serve online, plus on-the-fly RollingWindow aggregates for labels that need trailing-window correctness while omitting an online copy.

At inference time one Model Serving endpoint reads a FeatureSpec that joins the three online sources. A single request fans out into three parallel reads against Lakebase: one feature lookup on user_id, one on item_id, and one on user_id again for session state, followed by scoring and the response. The tail lookup dominates the latency budget; the rest is scoring and network.

uday_satapathy_9-1782225122175.png

Weigh the tradeoffs before you pick

Managed Publish carries the lowest operational cost, with minute-fresh as its ceiling. Declarative Window lets a windowed aggregate train and serve from a single definition, with the cron cadence setting the cost; RollingWindow handles the trailing aggregates that don't need an online copy. Direct Stream gets you seconds-fresh data and the most engineering work to own. Production recommenders run all three at once, and the design call for every feature is the same: weigh the cost of keeping it fresh against the cost of letting it lag, and place it accordingly.

One note before you start building: the native PostgreSQL Sink and Streaming Declarative Features on Kafka are both in Preview today. If you build with Direct Stream now, keep your Lakebase schemas and primary keys stable so the managed path can replace your code without touching the inference layer.

A snappy recommendation page, like showing running socks the moment someone adds shoes to their cart, happens because someone decided that "items clicked in this session" had to be seconds-fresh while "lifetime average order value" could sit a day stale, and then routed each signal accordingly. That routing decision, signal by signal, is where the work happens. If you skip that decision, you either pay to refresh things that don't need refreshing or ship a recommendation that lags behind the customer.

Conclusion

A 200-millisecond latency budget means you have to match each feature to the right lookup mechanism. There's no reason to update stable user profiles with the same frequency as sub-second session counters.

With Databricks Lakebase, you can choose from three online serving paths depending on your latency needs. Managed Publish handles minute-level updates. For consistent windowed aggregates, you can use Declarative Window. And when you need seconds-fresh operational state, Direct Stream writes directly to the serving layer.

The hard engineering work is the routing where every feature requires a deliberate choice: you have to weigh the cost of real-time processing against the performance impact of data lag. Getting this right lets you run models on active, real-time data instead of relying on static warehouse tables.