<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>article Fraud Detection Feature Engineering with Structured Streaming Real-Time Mode and Lakebase in Technical Blog</title>
    <link>https://community.databricks.com/t5/technical-blog/fraud-detection-feature-engineering-with-structured-streaming/ba-p/151308</link>
    <description>&lt;H1&gt;&lt;STRONG&gt;Introduction&lt;/STRONG&gt;&lt;/H1&gt;
&lt;H3&gt;&lt;STRONG&gt;Fraud Detection in Financial Services&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Every second, thousands of payment transactions flow through financial networks — card swipes at checkout, online purchases, mobile payments. Behind each one, a fraud detection system must evaluate the transaction in real time and decide whether to approve or block it before the customer is even aware a check is happening. In financial services, this is a zero-tolerance domain: a missed fraud event costs real money, and a false positive means a legitimate customer is turned away at the point of sale.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;What makes fraud particularly difficult to catch is that it doesn't stand still. Fraudsters adapt rapidly, probing for gaps in detection logic and shifting tactics the moment a pattern is identified. Effective detection requires more than knowing what a single transaction looks like — it requires understanding that transaction in context. How does it compare to this user's behavior over the last 10 minutes? Is the location consistent with where they transacted moments ago? Is the amount anomalous given their history at this merchant category?&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This is where machine learning models come in. Modern fraud detection systems use ML models that score every transaction in real time. But the quality of those scores depends entirely on the quality of the &lt;/SPAN&gt;&lt;STRONG&gt;features&lt;/STRONG&gt;&lt;SPAN&gt; fed to the model — and features are only as good as they are fresh.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;What is Feature Engineering?&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Feature engineering is the process of transforming raw transaction data into structured signals that an ML model can act on. A raw transaction record — a transaction ID, an amount, a merchant code, a timestamp — carries limited signal on its own. Features are derived characteristics that make behavioral patterns visible to the model.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;For fraud detection, the most predictive features are those that capture a user's recent history:&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Velocity features&lt;/STRONG&gt;&lt;SPAN&gt;: How many transactions has this user made in the last 5 minutes? In the last hour? A sudden spike in transaction count — especially across geographies — is one of the strongest fraud signals available.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Amount aggregates&lt;/STRONG&gt;&lt;SPAN&gt;: What is the sum of transaction amounts in the last 10 minutes? Is the current transaction amount significantly higher than this user's historical average at this merchant category?&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Geographic anomalies&lt;/STRONG&gt;&lt;SPAN&gt;: What is the distance between the current transaction location and the last known location? Is it physically possible to travel that distance in the time elapsed between the two transactions?&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;IP and device signals&lt;/STRONG&gt;&lt;SPAN&gt;: Has the IP address changed since the last transaction? Is this a device that has never been seen before for this user?&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Merchant risk indicators&lt;/STRONG&gt;&lt;SPAN&gt;: Is this merchant category associated with elevated fraud rates? Is this merchant seeing a sudden spike in transaction volume from new users?&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;SPAN&gt;These features are computed per transaction and stored in a &lt;/SPAN&gt;&lt;STRONG&gt;feature store&lt;/STRONG&gt;&lt;SPAN&gt; — a low-latency database that the model scoring service reads at inference time. The freshness of these features directly determines the quality of the fraud score. A velocity feature computed five minutes ago in a batch pipeline may already be stale by the time a fraudster completes their third transaction.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This is why latency is not a performance metric in fraud detection — it is a business constraint. The window between a fraudster's first transaction and detection is where losses occur. Shrinking that window from seconds to milliseconds directly translates to fewer fraudulent transactions approved.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;The Current Challenges&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Getting real-time features into production is harder than it sounds — and the gap between data science and production engineering is where much of the difficulty lives.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Data scientists work in Spark.&lt;/STRONG&gt;&lt;SPAN&gt; Apache Spark™ is the de facto standard for feature development and model training. Data scientists prototype feature logic in PySpark, iterate quickly, and validate results against historical data. Spark handles large-scale computation efficiently, and most organizations have deep institutional knowledge of its APIs and ecosystem.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;But online feature serving has historically required a second system.&lt;/STRONG&gt;&lt;SPAN&gt; At inference time, features need to be served in milliseconds — far below what traditional Spark micro-batch processing can achieve. This has typically required deploying a specialized streaming engine like Apache Flink or Kafka Streams alongside Spark to power the online feature pipeline. These systems can deliver sub-second latency, but they operate very differently from Spark.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;The result is a fragmented architecture with three compounding problems:&lt;/STRONG&gt;&lt;/P&gt;
&lt;OL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Duplicated logic&lt;/STRONG&gt;&lt;SPAN&gt;: The same feature — for example, "sum of transaction amounts in the last 10 minutes" — must be implemented twice: once in PySpark for training data generation, and again in Flink or Java for the live inference pipeline. This doubles the development and testing effort for every new feature.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Logic drift&lt;/STRONG&gt;&lt;SPAN&gt;: As the two implementations diverge over time — through different edge case handling, different watermark logic, different null treatment — the model ends up being trained on one version of reality and scored on another. This degrades model accuracy in ways that are subtle and difficult to diagnose.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Operational overhead&lt;/STRONG&gt;&lt;SPAN&gt;: Running two streaming systems means two separate monitoring stacks, two sets of failure modes, and a broader pool of specialized engineering expertise required to maintain both. Every incident has twice as many potential root causes. Onboarding new engineers takes longer. And every change to a feature requires coordinated updates across both codebases.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;&lt;STRONG&gt;Real-Time Mode changes this equation.&lt;/STRONG&gt;&lt;SPAN&gt; By delivering sub-second latency within the Spark API that data scientists already use, RTM eliminates the need for a second engine — and with it, the duplication, drift, and operational tax that come with maintaining one.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;What is Real-Time Mode?&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Launched in Public Preview in August 2025, RTM is a new trigger type for Spark Structured Streaming that enables consistent sub-second latencies — as low as 5ms end-to-end — without leaving the Spark API.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;How Real-Time Mode Works&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Three key technical innovations drive RTM's performance:&lt;/SPAN&gt;&lt;/P&gt;
&lt;OL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Continuous data flow&lt;/STRONG&gt;&lt;SPAN&gt;: Data is processed as it arrives rather than accumulated into discrete batches. Records flow through the pipeline the moment they appear, without waiting for a batch boundary.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Pipeline scheduling&lt;/STRONG&gt;&lt;SPAN&gt;: All query stages run simultaneously. Downstream tasks begin processing as soon as upstream tasks produce output, without waiting for an entire stage to complete.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Streaming shuffle&lt;/STRONG&gt;&lt;SPAN&gt;: Data is passed between tasks immediately, bypassing the latency bottlenecks of traditional disk-based shuffles. When a query requires a &lt;/SPAN&gt;&lt;SPAN&gt;groupBy&lt;/SPAN&gt;&lt;SPAN&gt;, records move between executors as they are produced.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;P&gt;&lt;SPAN&gt;Together, these innovations transform Spark into a high-performance, low-latency engine capable of handling demanding operational workloads.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Execution Model Comparison&lt;/STRONG&gt;&lt;/H3&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Model&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Processing Approach&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Typical Latency&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Micro-Batch&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Sequential stage execution; data accumulated into periodic batches&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Seconds (typically 1–5s)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Real-Time Mode&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Concurrent stage scheduling; continuous pull-based execution&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Sub-400ms (stable)&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;H3&gt;&lt;STRONG&gt;A Single-Line Migration Path&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;One practical advantage of RTM is how easy it is to adopt — or to dial back. Business requirements change: a pipeline that starts with a 1-minute SLA today might need sub-second latency tomorrow, or might benefit from a daily schedule for cost savings. With Spark, that is a single line of code:&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;# Daily or hourly batch
.trigger(availableNow=True)

# Sub-second real-time processing
.trigger(realTime="5 minutes")
&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;This also directly solves the logic drift problem described above. When feature transformation logic lives in a single Spark codebase, there is no second implementation to drift. Models are trained and scored on exactly the same logic.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Putting It Into Practice&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Now that we’ve covered why feature freshness is critical for fraud detection and how Real-Time Mode delivers sub-second processing within the Spark API, let’s build it end-to-end. In the walkthrough that follows, we’ll construct a complete real-time feature engineering pipeline that:&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;SPAN&gt;Generate Synthentic credit card transactions and publish them to Kafka&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;SPAN&gt;Ingests the transactions from Kafka and computes&amp;nbsp; stateless and stateful features&amp;nbsp; using transformWithState&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;SPAN&gt;Writes enriched features to Lakebase PostgreSQL via ForeachWriter for sub-second model serving&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;SPAN&gt;The full pipeline runs end-to-end with p99 latency under 500ms. All code is available in the companion &lt;/SPAN&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/tree/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;GitHub repository&lt;/SPAN&gt;&lt;/A&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Solution Overview&lt;/STRONG&gt;&lt;/H2&gt;
&lt;H3&gt;&lt;STRONG&gt;Prerequisites&lt;/STRONG&gt;&lt;/H3&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Requirement&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Details&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Databricks Runtime&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;17.3 LTS or later&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Compute type&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Dedicated access mode only (not standard, serverless, or Lakeflow)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Autoscaling&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Must be disabled&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Photon&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Must be disabled&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Spark config&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;&lt;LI-CODE lang="java"&gt;spark.databricks.streaming.realTimeMode.enabled = true&lt;/LI-CODE&gt;&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Unity Catalog&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Enabled (for governance and lineage)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Kafka topic&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Message broker for buffering transactions between the generator and pipeline&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Databricks Lakebase&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Serverless Postgres feature table for low-latency storage and retrieval&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;H2&gt;&lt;STRONG&gt;The Architecture: Six Stages from Stream to Serving&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;The pipeline processes transactions through six distinct stages, each optimized for performance and reliability:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="RTM-Community-Blog-Architecture-Six-Stages.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/24985iFAA2A071B7940B36/image-size/large?v=v2&amp;amp;px=999" role="button" title="RTM-Community-Blog-Architecture-Six-Stages.png" alt="RTM-Community-Blog-Architecture-Six-Stages.png" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The architecture uses Apache Spark™ for stream processing, Kafka for data ingestion, RocksDB for state management, and Databricks Lakebase Postgres for feature storage and serving.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;What makes this architecture distinctive is the combination of &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;transformWithState&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; for complex stateful computations with RTM's continuous execution model — every transaction is processed at sub-second latency without sacrificing fault tolerance or exactly-once semantics.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 1: High-Throughput Kafka Ingestion&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Synthetic transaction data, generated to mimic real-world credit card activity, flows continuously into Kafka. We use the &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;dbldatagen&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; library to produce realistic distributions for testing and development: log-normal amount distributions (matching real spending patterns), weighted merchant categories (restaurants more ordinary than jewelry stores), and realistic geographic coordinates.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;I&gt;&lt;SPAN&gt;The Data generation logic is encapsulated in the&amp;nbsp;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/34cc2f9cf226820ff32228875c944dd8ee501d38/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/utils/data_generator.py#L28" target="_blank" rel="noopener"&gt;&lt;FONT face="courier new,courier" color="#339966"&gt;TransactionDataGenerator&lt;/FONT&gt;&lt;/A&gt; class&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;/I&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;spec = (dg.DataGenerator(sparkSession=self.spark, name="transactions", rows=num_rows, partitions=1)
            .withIdOutput()            
            ... 
        )
        
        df = spec.build(withStreaming=True, options={"rowsPerSecond": rows_per_second}) \
            .drop("user_id_num", "merchant_id_num", "id") \
            .withColumn("timestamp", current_timestamp())&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;FONT color="#339966"&gt;&amp;nbsp;&lt;/FONT&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&lt;FONT color="#339966"&gt;withStreaming=True&lt;/FONT&gt;&amp;nbsp;option produces a continuous stream at 1,000 rows per second.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;I&gt;&lt;SPAN&gt;This class is then called in the&amp;nbsp;[&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/01_generate_streaming_data.ipynb" target="_blank" rel="noopener"&gt;01_generate_streaming_data&lt;/A&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/01_generate_streaming_data.ipynb" target="_blank" rel="noopener"&gt; notebook: &lt;/A&gt;3: Generate Streaming Transaction, Data]&amp;nbsp; to generate the data&lt;/SPAN&gt;&lt;/I&gt;&lt;I&gt;&lt;/I&gt;&lt;I&gt;&lt;/I&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;# Generate streaming transaction data
data_gen = TransactionDataGenerator(spark)
df_transactions = data_gen.generate_transaction_data(
    num_users=config.data_gen_config["num_users"],              # unique users
    num_merchants=config.data_gen_config["num_merchants"],      # unique merchants
    rows_per_second=config.data_gen_config["rows_per_second"]     # transactions per second
)&lt;/LI-CODE&gt;
&lt;P&gt;&lt;STRONG&gt;Serializing to Kafka:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The generated transactions are then serialized as JSON and written to Kafka. The &lt;/SPAN&gt;&lt;FONT face="courier new,courier" color="#339966"&gt;&lt;SPAN&gt;transaction_id&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; is used as the message key, ensuring that all events with the same ID maintain ordering, which is critical for downstream stateful processing.&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;#Convert row to JSON String
json_df = df_transactions.select(to_json(struct(*[col(c) for c in df_transactions.columns])).alias("value"), col("transaction_id").alias("key"))

kafkaWriter = (
    json_df
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", KAFKA_SERVER)
    .option("kafka.security.protocol", "SASL_SSL")
    .option("kafka.sasl.jaas.config", f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{KAFKA_USERNAME}' password='{KAFKA_SECRET}';")
    .option("kafka.ssl.endpoint.identification.algorithm", "https")
    .option("kafka.sasl.mechanism", "PLAIN")
    .option("topic", KAFKA_TOPIC)
    .option("failOnDataLoss", "true")
    .option("checkpointLocation", CHECKPOINT_LOCATION)
)

#Start Kafka Producer
kafkaQuery = kafkaWriter.start()&lt;/LI-CODE&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 2: Enabling Real-Time Mode&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;With data flowing into Kafka, the next step is configuring the pipeline to process it with sub-second latency.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Cluster Configuration:&lt;/STRONG&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Set the following Spark configuration in the cluster config before starting the pipeline. This is required for RTM to function correctly. More details can be found &lt;A title="Real-time mode in Structured Streaming Cluster Configuration" href="https://docs.databricks.com/aws/en/structured-streaming/real-time#cluster-configuration" target="_blank" rel="noopener"&gt;here&lt;/A&gt;.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="java"&gt;spark.conf.set("spark.databricks.streaming.realTimeMode.enabled", "true")&lt;/LI-CODE&gt;
&lt;P&gt;&lt;STRONG&gt;The RTM Trigger:&lt;/STRONG&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;.trigger(realTime="5 minutes")&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;SPAN&gt;"5 minutes"&lt;/SPAN&gt;&lt;SPAN&gt; parameter sets the &lt;/SPAN&gt;&lt;STRONG&gt;checkpoint interval&lt;/STRONG&gt;&lt;SPAN&gt; — how often the state is persisted to cloud storage for fault tolerance. Between checkpoints, Spark processes incoming transactions continuously, event by event. Longer intervals mean less checkpointing overhead but a longer replay window on failure; shorter intervals increase durability at a small latency cost.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Note:&lt;/STRONG&gt;&lt;SPAN&gt; RTM supports &lt;/SPAN&gt;&lt;SPAN&gt;outputMode("update")&lt;/SPAN&gt;&lt;SPAN&gt; only. Unlike micro-batch mode — which can buffer multiple records before emitting results — RTM emits incremental updates as each record is processed.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 3: Stateless Feature Engineering&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Stateless features are computed per transaction without any historical context. These include time-based patterns (hour of day, day of week, is-weekend flag), amount transformations (log-normalized amount, amount tier), and merchant risk indicators (high-risk category flags, merchant velocity scores). All 17 features are computed in a single streaming pass.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;I&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;/I&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/02_streaming_fraud_detection_pipeline.ipynb" target="_self"&gt;&lt;I&gt;&lt;SPAN&gt;02_streaming_fraud_detection_pipeline&lt;/SPAN&gt;&lt;/I&gt;&lt;/A&gt;&lt;I&gt;&lt;SPAN&gt; notebook →&amp;nbsp;5: Apply Stateless Features]&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;/I&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;df_with_stateless_features = feature_engineer.apply_all_features(kafka_df)&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;I&gt;[&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/utils/feature_engineering.py#L294" target="_self"&gt;feature_engineering.py#apply_all_features&lt;/A&gt;&lt;/I&gt;&lt;I&gt;]&lt;/I&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;def apply_all_features(self, df, include_optional=True):
	df_features = self.create_time_based_features(df)
	df_features = self.create_amount_features(df_features)
	df_features = self.create_merchant_features(df_features)
	
	if include_optional:
		df_features = self.create_location_features(df_features)
		df_features = self.create_device_features(df_features)
		df_features = self.create_network_features(df_features)
	
	return df_features&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;These features compute in approximately &amp;lt;&lt;/SPAN&gt;&lt;STRONG&gt;75ms (p99)&lt;/STRONG&gt;&lt;SPAN&gt;, providing immediate per-transaction context before the stateful layer adds historical patterns.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 4: Stateful Processing with transformWithState&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Fraud patterns rarely appear in a single transaction. Velocity spikes, impossible travel, and amount anomalies only become visible when you look across a user's transaction history. That requires maintaining per-user state across events — and that is where &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;transformWithState&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; comes in.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Why transformWithState?&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Apache Spark™'s &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;transformWithState&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; API is purpose-built for this kind of workload:&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Rich composite state types&lt;/STRONG&gt;&lt;SPAN&gt;: &lt;/SPAN&gt;&lt;SPAN&gt;ValueState&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;ListState&lt;/SPAN&gt;&lt;SPAN&gt;, and &lt;/SPAN&gt;&lt;SPAN&gt;MapState&lt;/SPAN&gt;&lt;SPAN&gt; allow per-entity state structures that link transactions for detecting velocity, fan-out, and anomaly patterns&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Built-in TTL&lt;/STRONG&gt;&lt;SPAN&gt;: State entries expire automatically — in this pipeline, after 24 hours of user inactivity — preventing unbounded state growth&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Stateful operator chaining&lt;/STRONG&gt;&lt;SPAN&gt;: Multiple stateful operations can be composed in a single pipeline&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;STRONG&gt;Safe state evolution&lt;/STRONG&gt;&lt;SPAN&gt;: State schemas can be updated without a cold start, enabling iterative rule updates in production&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;SPAN&gt;Built on RocksDB, &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;transformWithState&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; handles high-cardinality workloads efficiently while preserving Structured Streaming's exactly-once guarantees.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;State Design&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Each user's state stores a rich context object: recent transaction history, last-known location, IP address history, rolling amount totals, and velocity counters. Storing all of this in a single state object means every lookup is local — no external service calls, no added latency.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;I&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;/I&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/utils/feature_engineering.py#L457" target="_self"&gt;&lt;I&gt;&lt;SPAN&gt;feature_engineering.py#FraudDetectionFeaturesProcessor&lt;/SPAN&gt;&lt;/I&gt;&lt;/A&gt;&lt;I&gt;&lt;SPAN&gt;]&lt;/SPAN&gt;&lt;/I&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;state_schema = StructType([
	StructField("transaction_count", IntegerType(), False),
	StructField("last_timestamp", TimestampType(), True),
	StructField("last_ip_address", StringType(), True),
	StructField("last_latitude", DoubleType(), True),
	StructField("last_longitude", DoubleType(), True),
	StructField("ip_change_count", IntegerType(), False),
	StructField("total_amount", DoubleType(), False),
	StructField("avg_amount", DoubleType(), False),
	StructField("max_amount", DoubleType(), False),
	StructField("recent_timestamps", ArrayType(TimestampType()), False),
	StructField("recent_amounts", ArrayType(DoubleType()), False)
])&lt;/LI-CODE&gt;
&lt;H3&gt;&lt;STRONG&gt;Core Processing Logic&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;SPAN&gt;FraudDetectionFeaturesProcessor&lt;/SPAN&gt;&lt;SPAN&gt; encapsulates all stateful computations: transaction velocity, IP change tracking, geographic anomalies (impossible travel detection), and amount-based anomaly scoring. The pipeline calls &lt;/SPAN&gt;&lt;SPAN&gt;handleInputRows&lt;/SPAN&gt;&lt;SPAN&gt; for each transaction, which updates state and emits the computed features.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Important RTM behavior:&lt;/STRONG&gt;&lt;SPAN&gt; In Real-Time Mode, &lt;/SPAN&gt;&lt;SPAN&gt;handleInputRows&lt;/SPAN&gt;&lt;SPAN&gt; is called &lt;/SPAN&gt;&lt;STRONG&gt;once per record&lt;/STRONG&gt;&lt;SPAN&gt;. The &lt;/SPAN&gt;&lt;SPAN&gt;inputRows&lt;/SPAN&gt;&lt;SPAN&gt; iterator returns a single value, unlike micro-batch mode where multiple records per key may be batched together. Design your processor accordingly.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;I&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;/I&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/utils/feature_engineering.py#L429" target="_blank" rel="noopener"&gt;&lt;I&gt;&lt;SPAN&gt;feature_engineering.py#FraudDetectionFeaturesProcessor&lt;/SPAN&gt;&lt;/I&gt;&lt;/A&gt;&lt;I&gt;&lt;SPAN&gt;]&lt;/SPAN&gt;&lt;/I&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;class FraudDetectionFeaturesProcessor:
    """
    Stateful processor for real‑time fraud detection using
    ``transformWithState``.

    The processor maintains a single consolidated state per user and
    computes a rich set of fraud‑related features, including:
    * Transaction velocity (last hour / last 10 min)
    * IP address change tracking
    * Geographic anomalies (impossible travel)
    * Amount‑based anomalies (z‑score, ratio to user avg/max)

    The implementation has been streamlined:
    * Uses ``sorted`` directly on the input iterator to avoid an extra
      ``list`` allocation and explicit ``list.sort`` call.
    * Caps the recent‑transaction buffers to 50 entries to bound state size.
    * Imports ``numpy`` locally to avoid a module‑level import that is unused
      elsewhere.
    * Minor readability improvements while preserving the original logic.
    """

    def init(self, handle) -&amp;gt; None:
        """
        Initialize the processor with a single consolidated ``ValueState``.
        """
        self.handle = handle

        # Consolidated state schema (one object per user)
        state_schema = StructType([
            StructField("transaction_count", IntegerType(), False),
            StructField("last_timestamp", TimestampType(), True),
            StructField("last_ip_address", StringType(), True),
            StructField("last_latitude", DoubleType(), True),
            StructField("last_longitude", DoubleType(), True),
            StructField("ip_change_count", IntegerType(), False),
            StructField("total_amount", DoubleType(), False),
            StructField("avg_amount", DoubleType(), False),
            StructField("max_amount", DoubleType(), False),
            StructField("recent_timestamps", ArrayType(TimestampType()), False),
            StructField("recent_amounts", ArrayType(DoubleType()), False)
        ])

        self.user_state = handle.getValueState(
            "user_fraud_state",
            state_schema,
            ttlDurationMs=3600000  # 1 hour TTL
        )

    def handleInputRows(self, key, rows, timer_values) -&amp;gt; Iterator[Row]:
        """
        Process a batch of transactions for a single ``user_id`` and emit
        fraud‑detection features.
        """
        import numpy as np  # Local import – only needed here
        

        user_id, = key

        # Sort incoming rows by timestamp without an intermediate mutable list
        row_list = sorted(rows, key=lambda r: r["timestamp"])
        if not row_list:
            return

        # Load existing state or initialise defaults
        if self.user_state.exists():
            (
                prev_count,
                prev_last_time,
                prev_ip,
                prev_lat,
                prev_lon,
                prev_ip_changes,
                prev_total_amount,
                prev_avg_amount,
                prev_max_amount,
                prev_times,
                prev_amounts,
            ) = self.user_state.get()
            prev_times = list(prev_times) if prev_times else []
            prev_amounts = list(prev_amounts) if prev_amounts else []
        else:
            prev_count = 0
            prev_last_time = None
            prev_ip = None
            prev_lat = None
            prev_lon = None
            prev_ip_changes = 0
            prev_total_amount = 0.0
            prev_avg_amount = 0.0
            prev_max_amount = 0.0
            prev_times = []
            prev_amounts = []

        results = []

        for row in row_list:
            current_time = row["timestamp"]
            current_ip = row["ip_address"]
            current_lat = row["latitude"]
            current_lon = row["longitude"]
            current_amount = row["amount"]

            # Update counters
            prev_count += 1

            # Time delta since previous transaction
            if prev_last_time is not None:
                # Ensure both datetimes are offset-naive
                if hasattr(current_time, 'tzinfo') and current_time.tzinfo is not None:
                    current_time = current_time.replace(tzinfo=None)
                if hasattr(prev_last_time, 'tzinfo') and prev_last_time.tzinfo is not None:
                    prev_last_time = prev_last_time.replace(tzinfo=None)
                time_diff = (current_time - prev_last_time).total_seconds()
            else:
                time_diff = None    
            
            # IP change detection
            ip_changed = 0
            if prev_ip is not None and current_ip != prev_ip:
                ip_changed = 1
                prev_ip_changes += 1

            # Geographic distance &amp;amp; velocity
            distance_km = None
            velocity_kmh = None
            if prev_lat is not None and prev_lon is not None:
                distance_km = calculate_haversine_distance(
                    prev_lat, prev_lon, current_lat, current_lon
                )
                if distance_km is not None and time_diff and time_diff &amp;gt; 0:
                    velocity_kmh = (distance_km / time_diff) * 3600

            # Capture historical stats BEFORE updating with the current transaction
            # so anomaly ratios measure how current_amount compares to past behaviour.
            historical_avg_amount = prev_avg_amount
            historical_max_amount = prev_max_amount

            amount_vs_avg_ratio = (
                current_amount / historical_avg_amount if historical_avg_amount &amp;gt; 0 else 1.0
            )
            amount_vs_max_ratio = (
                current_amount / historical_max_amount if historical_max_amount &amp;gt; 0 else 1.0
            )

            # Z‑score (requires at least 3 prior amounts)
            amount_zscore = None
            if len(prev_amounts) &amp;gt;= 3:
                std = np.std(prev_amounts)
                if std &amp;gt; 0 and historical_avg_amount is not None:
                    amount_zscore = (current_amount - historical_avg_amount) / std

            # Update amount statistics with the current transaction for future rows
            prev_total_amount += current_amount
            prev_avg_amount = prev_total_amount / prev_count
            prev_max_amount = builtins.max(prev_max_amount, current_amount)

            # Update recent‑transaction buffers (capped at 50)
            prev_times.append(current_time)
            prev_amounts.append(current_amount)
            if len(prev_times) &amp;gt; 50:
                prev_times = prev_times[-50:]
                prev_amounts = prev_amounts[-50:]

            # Transaction counts in sliding windows
            one_hour_ago = current_time - timedelta(hours=1)
            ten_min_ago = current_time - timedelta(minutes=10)

            trans_last_hour = builtins.sum(1 for t in prev_times if t &amp;gt;= one_hour_ago)
            trans_last_10min = builtins.sum(1 for t in prev_times if t &amp;gt;= ten_min_ago)

            # Fraud indicators
            is_rapid = 1 if trans_last_10min &amp;gt;= 5 else 0
            is_impossible_travel = (
                1 if velocity_kmh is not None and velocity_kmh &amp;gt; 800 else 0
            )
            is_amount_anomaly = (
                1 if amount_zscore is not None and builtins.abs(amount_zscore) &amp;gt; 3 else 0
            )

            # Fraud score (capped at 100)
            fraud_score = 0.0
            fraud_score += 20 if is_rapid else 0
            fraud_score += 30 if is_impossible_travel else 0
            fraud_score += 25 if is_amount_anomaly else 0
            fraud_score += 15 if prev_ip_changes &amp;gt;= 5 else 0
            fraud_score += 10 if trans_last_hour &amp;gt;= 10 else 0
            fraud_score = builtins.min(fraud_score, 100.0)

            is_fraud_pred = 1 if fraud_score &amp;gt;= 50 else 0
            
            processing_timestamp = datetime.now()

            # Assemble output row
            results.append(
                Row(
                    transaction_id=row["transaction_id"],
                    user_id=user_id,
                    timestamp=current_time,
                    amount=current_amount,
                    merchant_id=row["merchant_id"],
                    merchant_category=row["merchant_category"],
                    currency=row["currency"],
                    payment_method=row["payment_method"],
                    card_type=row["card_type"],
                    device_id=row["device_id"],
                    ip_address=current_ip,
                    latitude=current_lat,
                    longitude=current_lon,
                    # Stateless features (pass-through)
                    month=row["month"],
                    hour=row["hour"],
                    day_of_week=row["day_of_week"],
                    is_business_hour=row["is_business_hour"],
                    is_weekend=row["is_weekend"],
                    is_night=row["is_night"],
                    hour_sin=row["hour_sin"],
                    hour_cos=row["hour_cos"],
                    amount_log=row["amount_log"],
                    amount_category=row["amount_category"],
                    is_round_amount=row["is_round_amount"],
                    is_exact_amount=row["is_exact_amount"],
                    merchant_risk_score=row["merchant_risk_score"],
                    is_private_ip=row["is_private_ip"],
                    # Stateful features
                    user_transaction_count=prev_count,
                    transactions_last_hour=trans_last_hour,
                    transactions_last_10min=trans_last_10min,
                    ip_changed=ip_changed,
                    ip_change_count_total=prev_ip_changes,
                    distance_from_last_km=distance_km,
                    velocity_kmh=velocity_kmh,
                    amount_vs_user_avg_ratio=amount_vs_avg_ratio,
                    amount_vs_user_max_ratio=amount_vs_max_ratio,
                    amount_zscore=amount_zscore,
                    seconds_since_last_transaction=time_diff,
                    is_rapid_transaction=is_rapid,
                    is_impossible_travel=is_impossible_travel,
                    is_amount_anomaly=is_amount_anomaly,
                    fraud_score=fraud_score,
                    is_fraud_prediction=is_fraud_pred,
                    processing_timestamp=processing_timestamp
                )
            )

            # Update state for next iteration
            prev_last_time = current_time
            prev_ip = current_ip
            prev_lat = current_lat
            prev_lon = current_lon

        # Persist consolidated state (atomic update)
        self.user_state.update(
            (
                prev_count,
                prev_last_time,
                prev_ip,
                prev_lat,
                prev_lon,
                prev_ip_changes,
                prev_total_amount,
                prev_avg_amount,
                prev_max_amount,
                prev_times,
                prev_amounts,
            )
        )

        # Emit results
        for row in results:
            yield row

    def close(self) -&amp;gt; None:
        """No cleanup required."""
        pass&lt;/LI-CODE&gt;
&lt;H3&gt;&lt;STRONG&gt;Applying the Transformation&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;I&gt;[&lt;/I&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/02_streaming_fraud_detection_pipeline.ipynb" target="_blank" rel="noopener"&gt;&lt;I&gt;02_streaming_fraud_detection_pipeline&lt;/I&gt;&lt;/A&gt;&lt;I&gt; notebook →&amp;nbsp;6: Apply Stateless Features]&amp;nbsp;&amp;nbsp;&lt;/I&gt;&lt;I&gt;&lt;/I&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;df_with_fraud_features = df_with_stateless_features \
    .groupBy("user_id") \
    .transformWithState(
        statefulProcessor=FraudDetectionFeaturesProcessor(),
        outputStructType=get_fraud_detection_output_schema(),
        outputMode="update",
        timeMode="processingTime"
    )&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The composite fraud score (0–100) produced by this stage flows directly into Lakebase, ready to be consumed by the fraud model at inference time.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 5: Optimized Lakebase PostgreSQL Writes&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Lakebase serves as both the operational feature store and transactional database, integrated directly into the Lakehouse. This eliminates the need for a separate feature store service and the data synchronization overhead that typically comes with it.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;The Challenge&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;At 1,000 transactions per second, writing records to Postgres one by one generates 1,000 round-trips per second. At that volume, individual writes would overwhelm Lakebase and introduce seconds of latency. The solution is batching with idempotency.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Solution: Batch UPSERT with ForeachWriter&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN class="s1"&gt;Spark Structured Streaming's ForeachWriter provides a way to write custom logic to process the streaming data.&amp;nbsp; Each Spark partition creates its own ForeachWriter instance, which in turn opens a new Lakebase PostgreSQL connection, accumulates a small batch of rows (configurable by us), and flushes them as a single UPSERT, thereby keeping both latency and connection count low.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;[&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/34cc2f9cf226820ff32228875c944dd8ee501d38/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/utils/lakebase_client.py#L433" target="_self"&gt;&lt;SPAN&gt;lakebase_client.py#LakebaseForeachWriter&lt;/SPAN&gt;&lt;/A&gt;]&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;class LakebaseForeachWriter:
    """
    ForeachWriter for per-partition streaming writes to Lakebase
    
    Usage:
        writer = lakebase_client.get_foreach_writer(
            table_name="transaction_features",
            conflict_columns=["transaction_id"]
        )
        query = df.writeStream.foreach(writer).start()
    """
    
    def __init__(self, creds, database, table_name: str, column_names: List[str],
                 conflict_columns: List[str], batch_size: int = 100):
        """
        Initialize ForeachWriter
        
        Args:
            creds: lakebase credentials
            database: database name
            table_name: Target table name
            column_names: Columns to be inserted or updated
            conflict_columns: Columns for ON CONFLICT clause (e.g., ["transaction_id"])
            batch_size: Number of rows to accumulate before writing
        """

        if not column_names:
            raise ValueError("column_names must be provided and non-empty")
        self.creds = creds
        self.database = database
        self.table_name = table_name
        self.column_names = column_names
        self.conflict_columns = conflict_columns
        # Build upsert SQL
        self.columns_str = ','.join(self.column_names)
        self.placeholders = ','.join(['%s'] * len(self.column_names))
        self.conflict_str = ','.join(self.conflict_columns)
        self.update_cols = [col for col in self.column_names if col not in self.conflict_columns]
        if self.update_cols:
            self.update_str = ','.join([f"{col}=EXCLUDED.{col}" for col in self.update_cols])
            self.sql = f"""
                    INSERT INTO {self.table_name} ({self.columns_str})
                    VALUES ({self.placeholders})
                    ON CONFLICT ({self.conflict_str}) DO UPDATE SET {self.update_str}
                """
        else:
            self.update_str = ""
            self.sql = f"""
                    INSERT INTO {self.table_name} ({self.columns_str})
                    VALUES ({self.placeholders})
                    ON CONFLICT ({self.conflict_str}) DO NOTHING
                """

        self.batch_size = batch_size
        self.max_retries = 3
        self.partition_id = None
        self.epoch_id = None
        
        # Per-partition state
        self.conn = None
        self.cursor = None
        self.current_batch = []        
    
    def open(self, partition_id, epoch_id):
        """
        Open database connection for this partition and epoch
        
        Called once per partition at the start of each micro-batch. Opens a new
        PostgreSQL connection with autocommit=False for transactional safety.
        
        Args:
            partition_id: Partition ID
            epoch_id: Streaming epoch ID
            
        Returns:
            bool: True if connection successful, False otherwise
        """
        try:
            logger.info(f"Opening connection for partition {partition_id}, epoch {epoch_id}")
            
            self.partition_id = partition_id
            self.epoch_id = epoch_id

            self.conn = psycopg2.connect(
                host=self.creds['host'],
                port=self.creds['port'],
                dbname=self.database,
                user=self.creds['user'],
                password=self.creds['password'],
                connect_timeout=10,
                sslmode='require'
            )
            self.conn.autocommit = False
            self.cursor = self.conn.cursor()
            self.current_batch = []
            return True
        except Exception as e:
            logger.error(f"Error opening connection for partition {partition_id}: {e}")
            return False
    
    def process(self, row_tuple):
        """
        Process a single row from the streaming query
        
        Accumulates rows in a buffer. When the buffer reaches batch_size,
        triggers an UPSERT batch write to PostgreSQL.
        
        Args:
            row_tuple: Tuple containing row values
            
        Raises:
            Exception: If row processing fails
        """
        try:
            # Validate column names
            if self.column_names is None:
                logger.error(f"Error missing column names")
            
            self.current_batch.append(row_tuple)
            
            # Flush when batch is full
            if len(self.current_batch) &amp;gt;= self.batch_size:
                self._execute_batch()
            
        except Exception as e:
            logger.error(f"Error processing row: {e}")
            raise
    
    def close(self, error):
        """
        Close connection and flush remaining batch
        
        Called once per partition at the end of the micro-batch. Flushes any
        remaining rows, commits or rolls back based on error status, and
        closes the database connection.
        
        Args:
            error: Exception if processing failed, None otherwise
        """
        try:
            if error is None and self.current_batch:
                self._execute_batch()                
            elif error is not None:
                logger.error(f"Error in partition, rolling back: {error}")
                if self.conn:
                    self.conn.rollback()
        except Exception as e:
            logger.error(f"Error during close: {e}")
        finally:
            if self.cursor:
                try:
                    self.cursor.close()
                except Exception:
                    pass
            if self.conn:
                try:
                    logger.info(f"Closing connection for partition {self.partition_id}, epoch {self.epoch_id}")
                    self.conn.close()
                except Exception:
                    pass
    
    def _execute_batch(self):
        """
        Execute the accumulated batch with retry logic
        
        Performs UPSERT operation (INSERT ... ON CONFLICT DO UPDATE) on all
        accumulated rows. Uses execute_batch for efficient bulk operations.
        Includes exponential backoff retry logic for transient errors.
        """
        if not self.current_batch:
            return
        
        def _execute():
            start_time = time.time()
            logger.info(f"Writing batch of {len(self.current_batch)} rows...")
            execute_batch(self.cursor, self.sql, self.current_batch, page_size=self.batch_size)
            self.conn.commit()
            self.current_batch = []
            
            elapsed = time.time() - start_time
            logger.info(f"Batch complete in {elapsed:.2f}s")
        
        # Execute with retry
        self._with_retry(_execute)
    
    def _with_retry(self, fn):
        """
        Execute function with retry logic and exponential backoff
        
        Args:
            fn: Function to execute (should be a no-arg callable)
            
        Raises:
            Exception: If all retries fail
        """
        for attempt in range(self.max_retries):
            try:
                fn()
                return
            except (OperationalError, DatabaseError) as e:
                if attempt &amp;lt; self.max_retries - 1:
                    logger.warning(f"Retry {attempt + 1}/{self.max_retries} after error: {e}")
                    time.sleep(2 ** attempt)  # Exponential backoff
                else:
                    logger.error(f"Failed after {self.max_retries} retries")
                    raise&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;ON CONFLICT DO UPDATE&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; clause makes writes idempotent: if a batch is reprocessed due to a failure, it updates existing rows rather than creating duplicates. This is what preserves exactly-once semantics through the full pipeline.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Batch size trade-offs:&lt;/STRONG&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Batch size&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Profile&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;5–10 rows&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Lower latency, higher per-write overhead&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;100–500 rows&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Higher throughput, increased latency&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;For this fraud detection pipeline, &lt;/SPAN&gt;&lt;FONT color="#339966"&gt;&lt;SPAN&gt;batch_size=5&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; is used to minimize latency.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Wiring It Together&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;I&gt;[&lt;/I&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/blob/main/2026-02-realtime-streaming-fraud-feature-engineering-with-lakebase/02_streaming_fraud_detection_pipeline.ipynb" target="_blank" rel="noopener"&gt;&lt;I&gt;02_streaming_fraud_detection_pipeline&lt;/I&gt;&lt;/A&gt;&lt;I&gt; notebook → 7: Write to Lakebase PostgreSQL via ForeachWriter]&amp;nbsp;&amp;nbsp;&lt;/I&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;#Get Schema from dataframe
table_schema = df_with_fraud_features.schema

#Initialize lakebase writer
lakebase_writer  = lakebase.get_foreach_writer(column_names=table_schema.names, batch_size=5)

#Start streaming query
query = df_with_fraud_features \
    .writeStream \
    .outputMode("update") \
    .foreach(lakebase_writer) \
    .option("checkpointLocation", FRAUD_PIPELINE_CHECKPOINT_LOCATION) \
    .trigger(realTime="5 minutes") \
    .start()&lt;/LI-CODE&gt;
&lt;H2&gt;&lt;STRONG&gt;Stage 6: Postgres Feature Serving&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Once features are written to Lakebase, they are available for low-latency reads by the fraud model during inference. Because Lakebase is a native Postgres database integrated into Databricks, serving features requires no additional data movement — the same table written by the pipeline is queried directly by the model serving layer.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Job Configuration&lt;/STRONG&gt;&lt;/H2&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Component&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Configuration&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Kafka Topic&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;4 partitions / 1,000 records per second&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Lakebase&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;1 Lakebase Providsioned Compute Unit&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Spark Cluster — Driver&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;rd-fleet.xlarge&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Spark Cluster — Workers&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;rd-fleet.xlarge × 3 nodes (12 cores)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Streaming Shuffle Partitions&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;4&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Lakebase Flush Buffer&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;5 records&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;H2&gt;&lt;STRONG&gt;Benchmark Results&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;The following measurements were collected under a sustained load of 1,000 transactions per second over a 24-hour period.&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;Metric&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;p90&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;p99&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Kafka Source Queuing Latency (ms)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 25&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 50&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;TWS Processing Latency (ms)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 190&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 230&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;Lakebase Write Latency (ms)&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 130&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;lt; 210&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;End-to-End Latency (ms)&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;&amp;lt; 310&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD&gt;
&lt;P&gt;&lt;STRONG&gt;&amp;lt; 400&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Things to Keep in Mind / Best Practices&lt;/STRONG&gt;&lt;/H2&gt;
&lt;OL&gt;
&lt;LI&gt;&lt;STRONG&gt; Choose Real-Time Mode when milliseconds matter&lt;/STRONG&gt;&lt;STRONG&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;SPAN&gt;RTM gives you sub-second latency within the Spark Structured Streaming API you already know — no second engine required. For fraud detection, personalization, or anomaly detection, this is now the right default choice.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt; Design state with TTL and explicit cleanup&lt;/STRONG&gt;&lt;STRONG&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;SPAN&gt;Unbounded state growth will eventually crash your pipeline. Implement TTL-based expiration from the start. In this pipeline, 24 hours of user inactivity triggers state removal, keeping the memory footprint bounded.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt; Batch your database writes&lt;/STRONG&gt;&lt;STRONG&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;SPAN&gt;Individual Postgres writes are expensive. Even a batch size of 5–10 rows delivers order-of-magnitude improvements in write efficiency. Tune &lt;/SPAN&gt;&lt;SPAN&gt;batch_size&lt;/SPAN&gt;&lt;SPAN&gt; based on your latency budget.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt; Monitor state size and processing time&lt;/STRONG&gt;&lt;STRONG&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;SPAN&gt;Two metrics predict pipeline health: state row count (growing without bound signals TTL issues) and processing time per batch (exceeding the trigger interval signals capacity problems). Use &lt;/SPAN&gt;&lt;SPAN&gt;rtmMetrics&lt;/SPAN&gt;&lt;SPAN&gt; in &lt;/SPAN&gt;&lt;SPAN&gt;StreamingQueryProgress&lt;/SPAN&gt;&lt;SPAN&gt; to track &lt;/SPAN&gt;&lt;SPAN&gt;processingLatencyMs&lt;/SPAN&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;sourceQueuingLatencyMs&lt;/SPAN&gt;&lt;SPAN&gt;, and &lt;/SPAN&gt;&lt;SPAN&gt;e2eLatencyMs&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;STRONG&gt; Test with production-like data distributions&lt;/STRONG&gt;&lt;STRONG&gt;&lt;BR /&gt;&lt;/STRONG&gt;&lt;SPAN&gt;Synthetic data should match production: transaction amounts, merchant category weights, geographic patterns, and temporal distributions. Include edge cases — late-arriving data, duplicate records, and state expiration scenarios — before going live.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/OL&gt;
&lt;H2&gt;&lt;STRONG&gt;Conclusion&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Real-time feature engineering for fraud detection demands both speed and accuracy. Miss either, and fraudsters win.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Apache Spark™'s &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;transformWithState&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt; API, combined with Real-Time Mode, provides the foundation for pipelines that deliver on both. By carefully designing each stage — from Kafka ingestion through stateful processing to optimized Postgres writes — this architecture achieves &lt;/SPAN&gt;&lt;STRONG&gt;sub-400ms end-to-end latency&lt;/STRONG&gt;&lt;SPAN&gt; while processing every transaction with exactly-once guarantees.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The patterns demonstrated here — Real-Time Mode triggers, stateful processing with TTL, batch UPSERT writes, and RocksDB-backed state management — apply broadly to streaming feature engineering across industries. Whether you are detecting fraud, personalizing recommendations, or monitoring IoT sensors, these techniques provide a proven blueprint for production-scale real-time systems.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Resources&lt;/STRONG&gt;&lt;/H2&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://docs.databricks.com/aws/en/structured-streaming/real-time" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Real-Time Mode Documentation&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://www.databricks.com/blog/real-time-mode-ultra-low-latency-streaming-spark-apis-without-second-engine" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Introducing Real-Time Mode — Databricks Blog&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://databricks.com/product/lakebase" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Databricks Lakebase&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://github.com/databricks-solutions/databricks-blogposts/pull/73" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Code Repository&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;/UL&gt;</description>
    <pubDate>Thu, 19 Mar 2026 15:16:04 GMT</pubDate>
    <dc:creator>JayPalaniappan</dc:creator>
    <dc:date>2026-03-19T15:16:04Z</dc:date>
    <item>
      <title>Fraud Detection Feature Engineering with Structured Streaming Real-Time Mode and Lakebase</title>
      <link>https://community.databricks.com/t5/technical-blog/fraud-detection-feature-engineering-with-structured-streaming/ba-p/151308</link>
      <description>&lt;P&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="0"&gt;In real-time fraud detection, milliseconds separate detection from loss&lt;/SPAN&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="1"&gt;.&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="2"&gt;Stop relying on slow batch features. This deep dive reveals how to combine Apache Spark™ Structured Streaming Real-Time Mode (RTM),&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;CODE class="appsElementsGenerativeaiAstInlineCode"&gt;transformWithState&lt;/CODE&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="4"&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;(TWS), and Databricks Lakebase to compute sophisticated fraud features with guaranteed&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;STRONG&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="5"&gt;sub-second end-to-end latency&lt;/SPAN&gt;&lt;/STRONG&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="7"&gt;.&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/SPAN&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="8"&gt;Unify your feature engineering and serving logic and simplify your architecture&lt;/SPAN&gt;&lt;SPAN class="appsElementsGenerativeaiAstAnimated" data-ast-node-id="9"&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;LI-WRAPPER&gt;&lt;/LI-WRAPPER&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Mar 2026 15:16:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/fraud-detection-feature-engineering-with-structured-streaming/ba-p/151308</guid>
      <dc:creator>JayPalaniappan</dc:creator>
      <dc:date>2026-03-19T15:16:04Z</dc:date>
    </item>
  </channel>
</rss>

