Every second, thousands of payment transactions flow through financial networks — card swipes at checkout, online purchases, mobile payments. Behind each one, a fraud detection system must evaluate the transaction in real time and decide whether to approve or block it before the customer is even aware a check is happening. In financial services, this is a zero-tolerance domain: a missed fraud event costs real money, and a false positive means a legitimate customer is turned away at the point of sale.
What makes fraud particularly difficult to catch is that it doesn't stand still. Fraudsters adapt rapidly, probing for gaps in detection logic and shifting tactics the moment a pattern is identified. Effective detection requires more than knowing what a single transaction looks like — it requires understanding that transaction in context. How does it compare to this user's behavior over the last 10 minutes? Is the location consistent with where they transacted moments ago? Is the amount anomalous given their history at this merchant category?
This is where machine learning models come in. Modern fraud detection systems use ML models that score every transaction in real time. But the quality of those scores depends entirely on the quality of the features fed to the model — and features are only as good as they are fresh.
Feature engineering is the process of transforming raw transaction data into structured signals that an ML model can act on. A raw transaction record — a transaction ID, an amount, a merchant code, a timestamp — carries limited signal on its own. Features are derived characteristics that make behavioral patterns visible to the model.
For fraud detection, the most predictive features are those that capture a user's recent history:
These features are computed per transaction and stored in a feature store — a low-latency database that the model scoring service reads at inference time. The freshness of these features directly determines the quality of the fraud score. A velocity feature computed five minutes ago in a batch pipeline may already be stale by the time a fraudster completes their third transaction.
This is why latency is not a performance metric in fraud detection — it is a business constraint. The window between a fraudster's first transaction and detection is where losses occur. Shrinking that window from seconds to milliseconds directly translates to fewer fraudulent transactions approved.
Getting real-time features into production is harder than it sounds — and the gap between data science and production engineering is where much of the difficulty lives.
Data scientists work in Spark. Apache Spark™ is the de facto standard for feature development and model training. Data scientists prototype feature logic in PySpark, iterate quickly, and validate results against historical data. Spark handles large-scale computation efficiently, and most organizations have deep institutional knowledge of its APIs and ecosystem.
But online feature serving has historically required a second system. At inference time, features need to be served in milliseconds — far below what traditional Spark micro-batch processing can achieve. This has typically required deploying a specialized streaming engine like Apache Flink or Kafka Streams alongside Spark to power the online feature pipeline. These systems can deliver sub-second latency, but they operate very differently from Spark.
The result is a fragmented architecture with three compounding problems:
Real-Time Mode changes this equation. By delivering sub-second latency within the Spark API that data scientists already use, RTM eliminates the need for a second engine — and with it, the duplication, drift, and operational tax that come with maintaining one.
Launched in Public Preview in August 2025, RTM is a new trigger type for Spark Structured Streaming that enables consistent sub-second latencies — as low as 5ms end-to-end — without leaving the Spark API.
Three key technical innovations drive RTM's performance:
Together, these innovations transform Spark into a high-performance, low-latency engine capable of handling demanding operational workloads.
|
Model |
Processing Approach |
Typical Latency |
|
Micro-Batch |
Sequential stage execution; data accumulated into periodic batches |
Seconds (typically 1–5s) |
|
Real-Time Mode |
Concurrent stage scheduling; continuous pull-based execution |
Sub-400ms (stable) |
One practical advantage of RTM is how easy it is to adopt — or to dial back. Business requirements change: a pipeline that starts with a 1-minute SLA today might need sub-second latency tomorrow, or might benefit from a daily schedule for cost savings. With Spark, that is a single line of code:
# Daily or hourly batch
.trigger(availableNow=True)
# Sub-second real-time processing
.trigger(realTime="5 minutes")
This also directly solves the logic drift problem described above. When feature transformation logic lives in a single Spark codebase, there is no second implementation to drift. Models are trained and scored on exactly the same logic.
Now that we’ve covered why feature freshness is critical for fraud detection and how Real-Time Mode delivers sub-second processing within the Spark API, let’s build it end-to-end. In the walkthrough that follows, we’ll construct a complete real-time feature engineering pipeline that:
The full pipeline runs end-to-end with p99 latency under 500ms. All code is available in the companion GitHub repository.
|
Requirement |
Details |
|
Databricks Runtime |
17.3 LTS or later |
|
Compute type |
Dedicated access mode only (not standard, serverless, or Lakeflow) |
|
Autoscaling |
Must be disabled |
|
Photon |
Must be disabled |
|
Spark config |
|
|
Unity Catalog |
Enabled (for governance and lineage) |
|
Kafka topic |
Message broker for buffering transactions between the generator and pipeline |
|
Databricks Lakebase |
Serverless Postgres feature table for low-latency storage and retrieval |
The pipeline processes transactions through six distinct stages, each optimized for performance and reliability:
The architecture uses Apache Spark™ for stream processing, Kafka for data ingestion, RocksDB for state management, and Databricks Lakebase Postgres for feature storage and serving.
What makes this architecture distinctive is the combination of transformWithState for complex stateful computations with RTM's continuous execution model — every transaction is processed at sub-second latency without sacrificing fault tolerance or exactly-once semantics.
Synthetic transaction data, generated to mimic real-world credit card activity, flows continuously into Kafka. We use the dbldatagen library to produce realistic distributions for testing and development: log-normal amount distributions (matching real spending patterns), weighted merchant categories (restaurants more ordinary than jewelry stores), and realistic geographic coordinates.
The Data generation logic is encapsulated in the TransactionDataGenerator class
spec = (dg.DataGenerator(sparkSession=self.spark, name="transactions", rows=num_rows, partitions=1)
.withIdOutput()
...
)
df = spec.build(withStreaming=True, options={"rowsPerSecond": rows_per_second}) \
.drop("user_id_num", "merchant_id_num", "id") \
.withColumn("timestamp", current_timestamp())
withStreaming=True option produces a continuous stream at 1,000 rows per second.
This class is then called in the [01_generate_streaming_data notebook: 3: Generate Streaming Transaction, Data] to generate the data
# Generate streaming transaction data
data_gen = TransactionDataGenerator(spark)
df_transactions = data_gen.generate_transaction_data(
num_users=config.data_gen_config["num_users"], # unique users
num_merchants=config.data_gen_config["num_merchants"], # unique merchants
rows_per_second=config.data_gen_config["rows_per_second"] # transactions per second
)
Serializing to Kafka:
The generated transactions are then serialized as JSON and written to Kafka. The transaction_id is used as the message key, ensuring that all events with the same ID maintain ordering, which is critical for downstream stateful processing.
#Convert row to JSON String
json_df = df_transactions.select(to_json(struct(*[col(c) for c in df_transactions.columns])).alias("value"), col("transaction_id").alias("key"))
kafkaWriter = (
json_df
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", KAFKA_SERVER)
.option("kafka.security.protocol", "SASL_SSL")
.option("kafka.sasl.jaas.config", f"kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{KAFKA_USERNAME}' password='{KAFKA_SECRET}';")
.option("kafka.ssl.endpoint.identification.algorithm", "https")
.option("kafka.sasl.mechanism", "PLAIN")
.option("topic", KAFKA_TOPIC)
.option("failOnDataLoss", "true")
.option("checkpointLocation", CHECKPOINT_LOCATION)
)
#Start Kafka Producer
kafkaQuery = kafkaWriter.start()
With data flowing into Kafka, the next step is configuring the pipeline to process it with sub-second latency.
Cluster Configuration:
Set the following Spark configuration in the cluster config before starting the pipeline. This is required for RTM to function correctly. More details can be found here.
spark.conf.set("spark.databricks.streaming.realTimeMode.enabled", "true")
The RTM Trigger:
.trigger(realTime="5 minutes")
The "5 minutes" parameter sets the checkpoint interval — how often the state is persisted to cloud storage for fault tolerance. Between checkpoints, Spark processes incoming transactions continuously, event by event. Longer intervals mean less checkpointing overhead but a longer replay window on failure; shorter intervals increase durability at a small latency cost.
Note: RTM supports outputMode("update") only. Unlike micro-batch mode — which can buffer multiple records before emitting results — RTM emits incremental updates as each record is processed.
Stateless features are computed per transaction without any historical context. These include time-based patterns (hour of day, day of week, is-weekend flag), amount transformations (log-normalized amount, amount tier), and merchant risk indicators (high-risk category flags, merchant velocity scores). All 17 features are computed in a single streaming pass.
[02_streaming_fraud_detection_pipeline notebook → 5: Apply Stateless Features]
df_with_stateless_features = feature_engineer.apply_all_features(kafka_df)
[feature_engineering.py#apply_all_features]
def apply_all_features(self, df, include_optional=True):
df_features = self.create_time_based_features(df)
df_features = self.create_amount_features(df_features)
df_features = self.create_merchant_features(df_features)
if include_optional:
df_features = self.create_location_features(df_features)
df_features = self.create_device_features(df_features)
df_features = self.create_network_features(df_features)
return df_features
These features compute in approximately <75ms (p99), providing immediate per-transaction context before the stateful layer adds historical patterns.
Fraud patterns rarely appear in a single transaction. Velocity spikes, impossible travel, and amount anomalies only become visible when you look across a user's transaction history. That requires maintaining per-user state across events — and that is where transformWithState comes in.
Apache Spark™'s transformWithState API is purpose-built for this kind of workload:
Built on RocksDB, transformWithState handles high-cardinality workloads efficiently while preserving Structured Streaming's exactly-once guarantees.
Each user's state stores a rich context object: recent transaction history, last-known location, IP address history, rolling amount totals, and velocity counters. Storing all of this in a single state object means every lookup is local — no external service calls, no added latency.
[feature_engineering.py#FraudDetectionFeaturesProcessor]
state_schema = StructType([
StructField("transaction_count", IntegerType(), False),
StructField("last_timestamp", TimestampType(), True),
StructField("last_ip_address", StringType(), True),
StructField("last_latitude", DoubleType(), True),
StructField("last_longitude", DoubleType(), True),
StructField("ip_change_count", IntegerType(), False),
StructField("total_amount", DoubleType(), False),
StructField("avg_amount", DoubleType(), False),
StructField("max_amount", DoubleType(), False),
StructField("recent_timestamps", ArrayType(TimestampType()), False),
StructField("recent_amounts", ArrayType(DoubleType()), False)
])
The FraudDetectionFeaturesProcessor encapsulates all stateful computations: transaction velocity, IP change tracking, geographic anomalies (impossible travel detection), and amount-based anomaly scoring. The pipeline calls handleInputRows for each transaction, which updates state and emits the computed features.
Important RTM behavior: In Real-Time Mode, handleInputRows is called once per record. The inputRows iterator returns a single value, unlike micro-batch mode where multiple records per key may be batched together. Design your processor accordingly.
[feature_engineering.py#FraudDetectionFeaturesProcessor]
class FraudDetectionFeaturesProcessor:
"""
Stateful processor for real‑time fraud detection using
``transformWithState``.
The processor maintains a single consolidated state per user and
computes a rich set of fraud‑related features, including:
* Transaction velocity (last hour / last 10 min)
* IP address change tracking
* Geographic anomalies (impossible travel)
* Amount‑based anomalies (z‑score, ratio to user avg/max)
The implementation has been streamlined:
* Uses ``sorted`` directly on the input iterator to avoid an extra
``list`` allocation and explicit ``list.sort`` call.
* Caps the recent‑transaction buffers to 50 entries to bound state size.
* Imports ``numpy`` locally to avoid a module‑level import that is unused
elsewhere.
* Minor readability improvements while preserving the original logic.
"""
def init(self, handle) -> None:
"""
Initialize the processor with a single consolidated ``ValueState``.
"""
self.handle = handle
# Consolidated state schema (one object per user)
state_schema = StructType([
StructField("transaction_count", IntegerType(), False),
StructField("last_timestamp", TimestampType(), True),
StructField("last_ip_address", StringType(), True),
StructField("last_latitude", DoubleType(), True),
StructField("last_longitude", DoubleType(), True),
StructField("ip_change_count", IntegerType(), False),
StructField("total_amount", DoubleType(), False),
StructField("avg_amount", DoubleType(), False),
StructField("max_amount", DoubleType(), False),
StructField("recent_timestamps", ArrayType(TimestampType()), False),
StructField("recent_amounts", ArrayType(DoubleType()), False)
])
self.user_state = handle.getValueState(
"user_fraud_state",
state_schema,
ttlDurationMs=3600000 # 1 hour TTL
)
def handleInputRows(self, key, rows, timer_values) -> Iterator[Row]:
"""
Process a batch of transactions for a single ``user_id`` and emit
fraud‑detection features.
"""
import numpy as np # Local import – only needed here
user_id, = key
# Sort incoming rows by timestamp without an intermediate mutable list
row_list = sorted(rows, key=lambda r: r["timestamp"])
if not row_list:
return
# Load existing state or initialise defaults
if self.user_state.exists():
(
prev_count,
prev_last_time,
prev_ip,
prev_lat,
prev_lon,
prev_ip_changes,
prev_total_amount,
prev_avg_amount,
prev_max_amount,
prev_times,
prev_amounts,
) = self.user_state.get()
prev_times = list(prev_times) if prev_times else []
prev_amounts = list(prev_amounts) if prev_amounts else []
else:
prev_count = 0
prev_last_time = None
prev_ip = None
prev_lat = None
prev_lon = None
prev_ip_changes = 0
prev_total_amount = 0.0
prev_avg_amount = 0.0
prev_max_amount = 0.0
prev_times = []
prev_amounts = []
results = []
for row in row_list:
current_time = row["timestamp"]
current_ip = row["ip_address"]
current_lat = row["latitude"]
current_lon = row["longitude"]
current_amount = row["amount"]
# Update counters
prev_count += 1
# Time delta since previous transaction
if prev_last_time is not None:
# Ensure both datetimes are offset-naive
if hasattr(current_time, 'tzinfo') and current_time.tzinfo is not None:
current_time = current_time.replace(tzinfo=None)
if hasattr(prev_last_time, 'tzinfo') and prev_last_time.tzinfo is not None:
prev_last_time = prev_last_time.replace(tzinfo=None)
time_diff = (current_time - prev_last_time).total_seconds()
else:
time_diff = None
# IP change detection
ip_changed = 0
if prev_ip is not None and current_ip != prev_ip:
ip_changed = 1
prev_ip_changes += 1
# Geographic distance & velocity
distance_km = None
velocity_kmh = None
if prev_lat is not None and prev_lon is not None:
distance_km = calculate_haversine_distance(
prev_lat, prev_lon, current_lat, current_lon
)
if distance_km is not None and time_diff and time_diff > 0:
velocity_kmh = (distance_km / time_diff) * 3600
# Capture historical stats BEFORE updating with the current transaction
# so anomaly ratios measure how current_amount compares to past behaviour.
historical_avg_amount = prev_avg_amount
historical_max_amount = prev_max_amount
amount_vs_avg_ratio = (
current_amount / historical_avg_amount if historical_avg_amount > 0 else 1.0
)
amount_vs_max_ratio = (
current_amount / historical_max_amount if historical_max_amount > 0 else 1.0
)
# Z‑score (requires at least 3 prior amounts)
amount_zscore = None
if len(prev_amounts) >= 3:
std = np.std(prev_amounts)
if std > 0 and historical_avg_amount is not None:
amount_zscore = (current_amount - historical_avg_amount) / std
# Update amount statistics with the current transaction for future rows
prev_total_amount += current_amount
prev_avg_amount = prev_total_amount / prev_count
prev_max_amount = builtins.max(prev_max_amount, current_amount)
# Update recent‑transaction buffers (capped at 50)
prev_times.append(current_time)
prev_amounts.append(current_amount)
if len(prev_times) > 50:
prev_times = prev_times[-50:]
prev_amounts = prev_amounts[-50:]
# Transaction counts in sliding windows
one_hour_ago = current_time - timedelta(hours=1)
ten_min_ago = current_time - timedelta(minutes=10)
trans_last_hour = builtins.sum(1 for t in prev_times if t >= one_hour_ago)
trans_last_10min = builtins.sum(1 for t in prev_times if t >= ten_min_ago)
# Fraud indicators
is_rapid = 1 if trans_last_10min >= 5 else 0
is_impossible_travel = (
1 if velocity_kmh is not None and velocity_kmh > 800 else 0
)
is_amount_anomaly = (
1 if amount_zscore is not None and builtins.abs(amount_zscore) > 3 else 0
)
# Fraud score (capped at 100)
fraud_score = 0.0
fraud_score += 20 if is_rapid else 0
fraud_score += 30 if is_impossible_travel else 0
fraud_score += 25 if is_amount_anomaly else 0
fraud_score += 15 if prev_ip_changes >= 5 else 0
fraud_score += 10 if trans_last_hour >= 10 else 0
fraud_score = builtins.min(fraud_score, 100.0)
is_fraud_pred = 1 if fraud_score >= 50 else 0
processing_timestamp = datetime.now()
# Assemble output row
results.append(
Row(
transaction_id=row["transaction_id"],
user_id=user_id,
timestamp=current_time,
amount=current_amount,
merchant_id=row["merchant_id"],
merchant_category=row["merchant_category"],
currency=row["currency"],
payment_method=row["payment_method"],
card_type=row["card_type"],
device_id=row["device_id"],
ip_address=current_ip,
latitude=current_lat,
longitude=current_lon,
# Stateless features (pass-through)
month=row["month"],
hour=row["hour"],
day_of_week=row["day_of_week"],
is_business_hour=row["is_business_hour"],
is_weekend=row["is_weekend"],
is_night=row["is_night"],
hour_sin=row["hour_sin"],
hour_cos=row["hour_cos"],
amount_log=row["amount_log"],
amount_category=row["amount_category"],
is_round_amount=row["is_round_amount"],
is_exact_amount=row["is_exact_amount"],
merchant_risk_score=row["merchant_risk_score"],
is_private_ip=row["is_private_ip"],
# Stateful features
user_transaction_count=prev_count,
transactions_last_hour=trans_last_hour,
transactions_last_10min=trans_last_10min,
ip_changed=ip_changed,
ip_change_count_total=prev_ip_changes,
distance_from_last_km=distance_km,
velocity_kmh=velocity_kmh,
amount_vs_user_avg_ratio=amount_vs_avg_ratio,
amount_vs_user_max_ratio=amount_vs_max_ratio,
amount_zscore=amount_zscore,
seconds_since_last_transaction=time_diff,
is_rapid_transaction=is_rapid,
is_impossible_travel=is_impossible_travel,
is_amount_anomaly=is_amount_anomaly,
fraud_score=fraud_score,
is_fraud_prediction=is_fraud_pred,
processing_timestamp=processing_timestamp
)
)
# Update state for next iteration
prev_last_time = current_time
prev_ip = current_ip
prev_lat = current_lat
prev_lon = current_lon
# Persist consolidated state (atomic update)
self.user_state.update(
(
prev_count,
prev_last_time,
prev_ip,
prev_lat,
prev_lon,
prev_ip_changes,
prev_total_amount,
prev_avg_amount,
prev_max_amount,
prev_times,
prev_amounts,
)
)
# Emit results
for row in results:
yield row
def close(self) -> None:
"""No cleanup required."""
pass
[02_streaming_fraud_detection_pipeline notebook → 6: Apply Stateless Features]
df_with_fraud_features = df_with_stateless_features \
.groupBy("user_id") \
.transformWithState(
statefulProcessor=FraudDetectionFeaturesProcessor(),
outputStructType=get_fraud_detection_output_schema(),
outputMode="update",
timeMode="processingTime"
)
The composite fraud score (0–100) produced by this stage flows directly into Lakebase, ready to be consumed by the fraud model at inference time.
Lakebase serves as both the operational feature store and transactional database, integrated directly into the Lakehouse. This eliminates the need for a separate feature store service and the data synchronization overhead that typically comes with it.
At 1,000 transactions per second, writing records to Postgres one by one generates 1,000 round-trips per second. At that volume, individual writes would overwhelm Lakebase and introduce seconds of latency. The solution is batching with idempotency.
Spark Structured Streaming's ForeachWriter provides a way to write custom logic to process the streaming data. Each Spark partition creates its own ForeachWriter instance, which in turn opens a new Lakebase PostgreSQL connection, accumulates a small batch of rows (configurable by us), and flushes them as a single UPSERT, thereby keeping both latency and connection count low.
[lakebase_client.py#LakebaseForeachWriter]
class LakebaseForeachWriter:
"""
ForeachWriter for per-partition streaming writes to Lakebase
Usage:
writer = lakebase_client.get_foreach_writer(
table_name="transaction_features",
conflict_columns=["transaction_id"]
)
query = df.writeStream.foreach(writer).start()
"""
def __init__(self, creds, database, table_name: str, column_names: List[str],
conflict_columns: List[str], batch_size: int = 100):
"""
Initialize ForeachWriter
Args:
creds: lakebase credentials
database: database name
table_name: Target table name
column_names: Columns to be inserted or updated
conflict_columns: Columns for ON CONFLICT clause (e.g., ["transaction_id"])
batch_size: Number of rows to accumulate before writing
"""
if not column_names:
raise ValueError("column_names must be provided and non-empty")
self.creds = creds
self.database = database
self.table_name = table_name
self.column_names = column_names
self.conflict_columns = conflict_columns
# Build upsert SQL
self.columns_str = ','.join(self.column_names)
self.placeholders = ','.join(['%s'] * len(self.column_names))
self.conflict_str = ','.join(self.conflict_columns)
self.update_cols = [col for col in self.column_names if col not in self.conflict_columns]
if self.update_cols:
self.update_str = ','.join([f"{col}=EXCLUDED.{col}" for col in self.update_cols])
self.sql = f"""
INSERT INTO {self.table_name} ({self.columns_str})
VALUES ({self.placeholders})
ON CONFLICT ({self.conflict_str}) DO UPDATE SET {self.update_str}
"""
else:
self.update_str = ""
self.sql = f"""
INSERT INTO {self.table_name} ({self.columns_str})
VALUES ({self.placeholders})
ON CONFLICT ({self.conflict_str}) DO NOTHING
"""
self.batch_size = batch_size
self.max_retries = 3
self.partition_id = None
self.epoch_id = None
# Per-partition state
self.conn = None
self.cursor = None
self.current_batch = []
def open(self, partition_id, epoch_id):
"""
Open database connection for this partition and epoch
Called once per partition at the start of each micro-batch. Opens a new
PostgreSQL connection with autocommit=False for transactional safety.
Args:
partition_id: Partition ID
epoch_id: Streaming epoch ID
Returns:
bool: True if connection successful, False otherwise
"""
try:
logger.info(f"Opening connection for partition {partition_id}, epoch {epoch_id}")
self.partition_id = partition_id
self.epoch_id = epoch_id
self.conn = psycopg2.connect(
host=self.creds['host'],
port=self.creds['port'],
dbname=self.database,
user=self.creds['user'],
password=self.creds['password'],
connect_timeout=10,
sslmode='require'
)
self.conn.autocommit = False
self.cursor = self.conn.cursor()
self.current_batch = []
return True
except Exception as e:
logger.error(f"Error opening connection for partition {partition_id}: {e}")
return False
def process(self, row_tuple):
"""
Process a single row from the streaming query
Accumulates rows in a buffer. When the buffer reaches batch_size,
triggers an UPSERT batch write to PostgreSQL.
Args:
row_tuple: Tuple containing row values
Raises:
Exception: If row processing fails
"""
try:
# Validate column names
if self.column_names is None:
logger.error(f"Error missing column names")
self.current_batch.append(row_tuple)
# Flush when batch is full
if len(self.current_batch) >= self.batch_size:
self._execute_batch()
except Exception as e:
logger.error(f"Error processing row: {e}")
raise
def close(self, error):
"""
Close connection and flush remaining batch
Called once per partition at the end of the micro-batch. Flushes any
remaining rows, commits or rolls back based on error status, and
closes the database connection.
Args:
error: Exception if processing failed, None otherwise
"""
try:
if error is None and self.current_batch:
self._execute_batch()
elif error is not None:
logger.error(f"Error in partition, rolling back: {error}")
if self.conn:
self.conn.rollback()
except Exception as e:
logger.error(f"Error during close: {e}")
finally:
if self.cursor:
try:
self.cursor.close()
except Exception:
pass
if self.conn:
try:
logger.info(f"Closing connection for partition {self.partition_id}, epoch {self.epoch_id}")
self.conn.close()
except Exception:
pass
def _execute_batch(self):
"""
Execute the accumulated batch with retry logic
Performs UPSERT operation (INSERT ... ON CONFLICT DO UPDATE) on all
accumulated rows. Uses execute_batch for efficient bulk operations.
Includes exponential backoff retry logic for transient errors.
"""
if not self.current_batch:
return
def _execute():
start_time = time.time()
logger.info(f"Writing batch of {len(self.current_batch)} rows...")
execute_batch(self.cursor, self.sql, self.current_batch, page_size=self.batch_size)
self.conn.commit()
self.current_batch = []
elapsed = time.time() - start_time
logger.info(f"Batch complete in {elapsed:.2f}s")
# Execute with retry
self._with_retry(_execute)
def _with_retry(self, fn):
"""
Execute function with retry logic and exponential backoff
Args:
fn: Function to execute (should be a no-arg callable)
Raises:
Exception: If all retries fail
"""
for attempt in range(self.max_retries):
try:
fn()
return
except (OperationalError, DatabaseError) as e:
if attempt < self.max_retries - 1:
logger.warning(f"Retry {attempt + 1}/{self.max_retries} after error: {e}")
time.sleep(2 ** attempt) # Exponential backoff
else:
logger.error(f"Failed after {self.max_retries} retries")
raise
The ON CONFLICT DO UPDATE clause makes writes idempotent: if a batch is reprocessed due to a failure, it updates existing rows rather than creating duplicates. This is what preserves exactly-once semantics through the full pipeline.
Batch size trade-offs:
|
Batch size |
Profile |
|
5–10 rows |
Lower latency, higher per-write overhead |
|
100–500 rows |
Higher throughput, increased latency |
For this fraud detection pipeline, batch_size=5 is used to minimize latency.
[02_streaming_fraud_detection_pipeline notebook → 7: Write to Lakebase PostgreSQL via ForeachWriter]
#Get Schema from dataframe
table_schema = df_with_fraud_features.schema
#Initialize lakebase writer
lakebase_writer = lakebase.get_foreach_writer(column_names=table_schema.names, batch_size=5)
#Start streaming query
query = df_with_fraud_features \
.writeStream \
.outputMode("update") \
.foreach(lakebase_writer) \
.option("checkpointLocation", FRAUD_PIPELINE_CHECKPOINT_LOCATION) \
.trigger(realTime="5 minutes") \
.start()
Once features are written to Lakebase, they are available for low-latency reads by the fraud model during inference. Because Lakebase is a native Postgres database integrated into Databricks, serving features requires no additional data movement — the same table written by the pipeline is queried directly by the model serving layer.
|
Component |
Configuration |
|
Kafka Topic |
4 partitions / 1,000 records per second |
|
Lakebase |
1 Lakebase Providsioned Compute Unit |
|
Spark Cluster — Driver |
rd-fleet.xlarge |
|
Spark Cluster — Workers |
rd-fleet.xlarge × 3 nodes (12 cores) |
|
Streaming Shuffle Partitions |
4 |
|
Lakebase Flush Buffer |
5 records |
The following measurements were collected under a sustained load of 1,000 transactions per second over a 24-hour period.
|
Metric |
p90 |
p99 |
|
Kafka Source Queuing Latency (ms) |
< 25 |
< 50 |
|
TWS Processing Latency (ms) |
< 190 |
< 230 |
|
Lakebase Write Latency (ms) |
< 130 |
< 210 |
|
End-to-End Latency (ms) |
< 310 |
< 400 |
Real-time feature engineering for fraud detection demands both speed and accuracy. Miss either, and fraudsters win.
Apache Spark™'s transformWithState API, combined with Real-Time Mode, provides the foundation for pipelines that deliver on both. By carefully designing each stage — from Kafka ingestion through stateful processing to optimized Postgres writes — this architecture achieves sub-400ms end-to-end latency while processing every transaction with exactly-once guarantees.
The patterns demonstrated here — Real-Time Mode triggers, stateful processing with TTL, batch UPSERT writes, and RocksDB-backed state management — apply broadly to streaming feature engineering across industries. Whether you are detecting fraud, personalizing recommendations, or monitoring IoT sensors, these techniques provide a proven blueprint for production-scale real-time systems.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.