cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
srikanth-kilari
Databricks Employee
Databricks Employee

1. Introduction

The Web3 landscape is rapidly expanding, with the total cryptocurrency market capitalization currently accounting for around 3% of global Gross Domestic Product (GDP), and that share is expected to grow. While cryptocurrencies are a major component of Web3, the ecosystem goes far beyond, encompassing decentralized finance (DeFi), smart contracts, digital identity, and more. As more countries explore crypto reserves, Central Bank Digital Currencies (CBDCs), and decentralized infrastructure, Web3 is shifting from an experimental frontier to a core layer of modern digital economies. Blockchain networks now power real-time applications across trading, lending, identity, gaming, and asset management. Recent approvals of crypto ETFs are accelerating mainstream adoption by bridging digital assets with traditional financial markets.

This ecosystem operates nonstop, around the clock, every day of the year. Prices change every second, trades execute in microseconds, and decentralized networks continuously emit high-frequency transaction data from thousands of sources. As a result, this ecosystem generates massive volumes of data on a daily basis. To build responsive and insightful trading analytics platforms, it is critical to process this data efficiently, reliably, and at scale. Centralized Exchanges (CEXs), Decentralized Exchanges (DEXs), and blockchains expose APIs and WebSocket feeds that stream real-time data, but ingesting and unifying it across heterogeneous systems remains challenging.

This blog demonstrates how Databricks can harness streaming Web3 data by leveraging the Python Data Source API, which enables direct ingestion from external APIs such as centralized exchanges, decentralized protocols, and blockchain networks. This approach allows Web3 data to flow into Delta Lake without relying on complex intermediate infrastructure. Combined with Databricks Workflows and DLT, the architecture supports scalable analytics for computing trading indicators such as Moving Average Convergence Divergence (MACD) and the Relative Strength Index (RSI) in near real-time.

 

Disclaimer: This blog is intended for educational purposes only. It demonstrates technical approaches to ingesting and analyzing real-time Web3 data using Databricks. It does not constitute financial advice, and Databricks does not endorse or recommend any particular exchange, decentralized protocol, cryptocurrency, or blockchain token.

Table of Contents

 

2. Architecture Overview

This solution follows the Medallion Architecture pattern with Bronze, Silver, and Gold layers to enable scalable and modular streaming analytics for Web3 data. 

WEB3 Blog.png

The Bronze layer is implemented using Structured Streaming via the Python Data Source API. It ingests real-time data from public Web3 endpoints including Binance (CEX), Uniswap (DEX), and the Ethereum blockchain. Each source is connected through a custom streaming reader that handles offset tracking and polling via REST or GraphQL APIs. The ingested data is written into Delta tables on a continuous basis. As of the time of writing, Databricks serverless compute does not support Structured Streaming jobs with time-based trigger intervals. Due to this limitation, the Bronze streaming jobs run on classic compute clusters to maintain uninterrupted ingestion.

The Silver and Gold layers are built using DLT. Since the Bronze tables are managed outside of the DLT pipeline, intermediate views are introduced within the pipeline to read from them. These views apply lightweight transformations such as type casting, timestamp normalization, and schema standardization. The Silver tables are maintained using SCD Type 1 semantics to ensure deduplicated, up-to-date records based on primary keys and event timestamps.

The Gold layer consists of materialized views (MVs) that aggregate and correlate the curated Silver data. These MVs compute meaningful trading and network metrics such as price trends, trade volume, swap activity, gas fees, MACD, RSI, and slippage between CEX and DEX prices. By joining across Silver sources on a common time bucket, the Gold layer enables synchronized and unified Web3 analytics.

All processing is orchestrated end-to-end using Databricks Workflows. Structured Streaming ingestion is handled using classic compute, while downstream Silver and Gold transformations are processed on a serverless DLT pipeline. The solution captures unique identifiers such as trade IDs, swap IDs, and block numbers during ingestion. These identifiers provide the foundation for detecting potential data gaps that may occur when live APIs only expose a limited window of recent events. If any missing records are identified, separate historical backfill processes can be executed using the corresponding history APIs to retrieve the data. This approach ensures accuracy and completeness in the data pipeline without introducing complexity into the main streaming ingestion flow.

This architecture is designed for extensibility. Additional data sources such as other CEXs (e.g., OKX, Kraken), DEXs (e.g., Jupiter, Meteora), or alternative blockchains (e.g., Solana, Avalanche) can be integrated by implementing new reader classes and extending the Bronze, Silver, and Gold layers accordingly. The modular nature of the Medallion Architecture allows these sources to plug into the existing pipeline with minimal disruption. In addition to new sources, this pipeline can be extended to ingest other asset pairs (e.g., BTC/USDT, SOL/USDC) by supplying appropriate parameters to the Python Data Source readers. The Python Data Source API classes used to ingest CEX, DEX, and blockchain data are implemented as reusable components. These can be packaged and reused across multiple pipelines, enabling a standardized and maintainable ingestion framework for expanding datasets and use cases. Starting from Databricks runtime 15.3 and above, the Python Data Source API is officially supported within DLT, making it feasible to define and integrate custom Python-based implementations for both batch and streaming workloads. However, this blog demonstrates the streaming logic outside of DLT, using separate Structured Streaming jobs, as that path was more stable at the time of writing.

 

3. Streaming Web3 Data with Structured Streaming via Python Data Source API

Databricks’ Python Data Source API enables direct ingestion from many sources, including event-driven APIs without requiring intermediate messaging systems. This makes it ideal for Web3 sources like centralized exchanges (CEXs), decentralized exchanges (DEXs), and blockchain networks. Each custom source is defined using:

  • A DataSource class to register the source and define the schema
  • A SimpleDataSourceStreamReader class to implement polling, offset tracking, and row emission

This section outlines how data is streamed from three Web3 systems and persisted to Bronze Delta tables.

 

3a) Binance - CEX Trades via REST API

Binance exposes real-time trade data for crypto pairs using its REST API. This solution streams recent trades for the ETH/USDC pair using the /api/v3/trades endpoint. Each trade includes a unique trade_id, which is used as the offset to track progress and avoid duplicates. Since this endpoint returns only the latest few trades, the ingestion logic captures the trade_id to enable the detection of missing events. This allows for optional backfill through historical APIs as a separate process if needed, while keeping the live reader simple and lightweight. The schema includes fields for trade ID, pair symbol, price, quantity, and event timestamp.

Stream Reader Class: 

This stream reader polls the public endpoint /api/v3/trades, which doesn't require authentication, using trade_id as the offset to track progress. The initialOffset() method sets the starting position (e.g., 0), and getBatch() fetches the next set of trades and updates the offset using the latest trade_id processed. This ensures safe resumption across micro-batches while avoiding duplicate trades.

import requests
from datetime import datetime
from pyspark.sql.datasource import SimpleDataSourceStreamReader, DataSource
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
class BinanceSimpleStreamReader(SimpleDataSourceStreamReader):
   def __init__(self, schema: StructType, options: dict):
       super().__init__()
       self.schema = schema
       self.symbol = options.get("symbol", "ETHUSDC")

   def initialOffset(self):
       # Start from the beginning
       return {"last_trade_id": 0}

   def read(self, start: dict):
       last_seen_id = int(start.get("last_trade_id", 0))
       url = f"https://api.binance.us/api/v3/trades?symbol={self.symbol}&limit=500"
       try:
           response = requests.get(url)
           response.raise_for_status()
           trades = response.json()
           new_data = []
           new_max_id = last_seen_id
           for trade in trades:
               trade_id = trade["id"]
               if trade_id > last_seen_id:
                   row = (
                       str(trade["id"]),        # trade_id
                       f"{self.symbol[:3]}/{self.symbol[3:]}",
                       float(trade["price"]),
                       float(trade["qty"]),
                       datetime.utcfromtimestamp(trade["time"] / 1000).isoformat()
                   )
                   new_data.append(row)
                   new_max_id = max(new_max_id, trade_id)
           # Only advance the offset if we actually got new data
           if new_data:
               return new_data, {"last_trade_id": new_max_id}
           else:
               return [], start
       except Exception as e:
           print("Error fetching Binance trades:", str(e))
           return [], start

Data Source Class:

class BinanceDataSource(DataSource):
   @classmethod
   def name(cls):
       return "binance_trade"

   def __init__(self, options):
       self.options = options

   def schema(self):
       return StructType([
           StructField("trade_id", StringType(), True),   
           StructField("symbol", StringType(), True),
           StructField("price", DoubleType(), True),
           StructField("quantity", DoubleType(), True),
           StructField("timestamp", StringType(), True),
       ])

   def simpleStreamReader(self, schema: StructType):
       return BinanceSimpleStreamReader(schema, self.options)

In the above Binance example, the reader extends SimpleDataSourceStreamReader, which is appropriate for lower-volume or single-threaded data sources that do not require partitioning. For higher-throughput ingestion or when dealing with large historical ranges, it is often beneficial to define partitions that split the data into manageable time windows or shards. The Uniswap and Ethereum readers demonstrate this by extending DataSourceStreamReader and implementing a custom InputPartition class. This enables parallel reads from the source system, provided the underlying data is partitionable (e.g., by timestamp, in an offset-aware API). Choosing between Simple and standard readers depends on the expected data volume and whether parallelism is required for performance.

 

3b) Uniswap - DEX Swaps via Graph

Uniswap V3 data is accessed through The Graph’s GraphQL API, which exposes recent swaps in each pool. For this use case, swaps from the ETH/USDC pool are streamed using the swap timestamp as the offset. The reader polls the subgraph with a time-based filter to retrieve only new swaps. The last-seen timestamp is persisted in the offset state, ensuring deterministic and replay-safe ingestion. Since Uniswap swaps are immutable and timestamped, this model is reliable and stateless. The schema includes the swap ID, trading pair, USD amount, and timestamp.

 

Note: Accessing the Graph’s hosted service requires an API key, which can be obtained for free by creating an account on the Graph’s developer portal. The key is required when querying subgraphs on the decentralized network. Additionally, the gql and gql[requests] Python packages must be installed using pip.

Stream Reader Class: 

Unlike the Binance reader, which uses a numerical trade ID for tracking offsets, this Uniswap stream reader relies on timestamps. The latestOffset() method queries the latest swap timestamp from the target pool to establish the most recent point in time from which new data should be streamed.

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType
from datetime import datetime
from gql import gql, Client
from gql.transport.requests import RequestsHTTPTransport
import time

class SwapPartition(InputPartition):
   def __init__(self, start_ts, end_ts):
       self.start = start_ts
       self.end = end_ts

class UniswapStreamReader(DataSourceStreamReader):
   def __init__(self, schema, options):
       self.schema = schema
       self.pool = options.get("pool")
       if not self.pool:
           raise ValueError("Missing required option: 'pool'")
       self.api_key = options.get("api_key")
       self.subgraph_id = options.get("subgraph_id")
       if not self.api_key or not self.subgraph_id:
           raise ValueError("Both 'api_key' and 'subgraph_id' are required.")
       self.graph_url = f"https://gateway.thegraph.com/api/{self.api_key}/subgraphs/id/{self.subgraph_id}"
   
   def _get_client(self):
       return Client(
           transport=RequestsHTTPTransport(
               url=self.graph_url,
               headers={"Content-Type": "application/json"},
               verify=True,
               retries=3
           ),
           fetch_schema_from_transport=False
       )

   def initialOffset(self):
       # Default to 60 seconds ago to align with Binance CEX real-time start
       default_start = int(time.time()) - 60
       start_ts = int(self.options.get("start_unix_timestamp", default_start))
       return {"last_timestamp": str(start_ts)}

   def latestOffset(self):
       # Fetch the latest swap timestamp from the pool to use as the offset.
       # This method ensures that the stream starts from the most recent swap and avoids reprocessing historical data.

       client = self._get_client()
       query = gql(f"""
       {{
         swaps(first: 1, orderBy: timestamp, orderDirection: desc,
               where: {{ pool: "{self.pool}" }}) {{
           timestamp
         }}
       }}
       """)
       result = client.execute(query)
       latest_ts = int(result["swaps"][0]["timestamp"]) if result["swaps"] else 0
       return {"last_timestamp": str(latest_ts)}

   def partitions(self, start, end):
       start_ts = int(start["last_timestamp"])
       end_ts = int(end["last_timestamp"])
       if end_ts <= start_ts:
           return []
       return [SwapPartition(start_ts + 1, end_ts)]

   def read(self, partition):
       client = self._get_client()
       # Dynamically get token0/token1 symbols
       pool_query = gql(f"""
       {{
           pool(id: "{self.pool}") {{
               token0 {{ symbol }}
               token1 {{ symbol }}
           }}
       }}
       """)
       pool_result = client.execute(pool_query)
       token0_symbol = pool_result["pool"]["token0"]["symbol"]
       token1_symbol = pool_result["pool"]["token1"]["symbol"]
       # Determine which amount field is ETH/USDC
       if token0_symbol == "USDC":
           usdc_field = "amount0"
           eth_field = "amount1"
       else:
           usdc_field = "amount1"
           eth_field = "amount0"

       # Fetch swaps
       swap_query = gql(f"""
       {{
         swaps(first: 100, orderBy: timestamp, orderDirection: asc,
               where: {{ pool: "{self.pool}", timestamp_gt: {partition.start}, timestamp_lte: {partition.end} }}) {{
           id
           timestamp
           amountUSD
           amount0
           amount1
         }}
       }}
       """)
       result = client.execute(swap_query)
       for swap in result.get("swaps", []):
           yield (
               swap["id"],
               "ETH/USDC",
               float(swap["amountUSD"]),
               float(swap[eth_field]),
               float(swap[usdc_field]),
               datetime.utcfromtimestamp(int(swap["timestamp"])).isoformat(),
               int(swap["timestamp"])
           )

   def commit(self, end):
       pass

Data Source Class:

class UniswapDataSource(DataSource):
   @classmethod
   def name(cls):
       return "uniswap_swap"

   def __init__(self, options):
       self.options = options

   def schema(self):
       return StructType([
           StructField("swap_id", StringType()),
           StructField("pair", StringType()),
           StructField("amount_usd", DoubleType()),
           StructField("amount_eth", DoubleType()),
           StructField("amount_usdc", DoubleType()),
           StructField("timestamp", StringType()),
           StructField("timestamp_unix", LongType())
       ])

   def streamReader(self, schema):
       return UniswapStreamReader(schema, self.options)

 

3c) Ethereum Node - Blockchain Metadata via JSON-RPC

Ethereum block metadata is retrieved through public RPC endpoints using Web3.py. The reader fetches the latest block and extracts relevant metrics such as block number, timestamp, transaction count, miner address, and base fee per gas unit. To maintain ingestion order and prevent duplicates, the reader uses the block_number as the streaming offset. Blocks with no transactions are excluded to reduce noise and focus on active on-chain events. The schema includes fields to support gas analytics, transaction volume monitoring, and block-level observability.

Stream Reader Class:

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, DoubleType
from web3 import Web3
from datetime import datetime

class BlockPartition(InputPartition):
   def __init__(self, block_number):
       self.block_number = block_number

class EthereumStreamReader(DataSourceStreamReader):
   def __init__(self, schema, options):
       self.schema = schema
       self.rpc_url = options.get("rpc_url")
       if not self.rpc_url:
           raise ValueError("Missing required option: 'rpc_url'")

   def _get_web3(self):
       return Web3(Web3.HTTPProvider(self.rpc_url))

   def initialOffset(self):
       latest_block = self._get_web3().eth.get_block("latest")
       return {"last_block_number": str(latest_block.number)}

   def latestOffset(self):
       latest_block = self._get_web3().eth.get_block("latest")
       return {"last_block_number": str(latest_block.number)}

   def partitions(self, start, end):
       start_num = int(start["last_block_number"])
       end_num = int(end["last_block_number"])
       if end_num <= start_num:
           return []
       return [BlockPartition(bn) for bn in range(start_num + 1, end_num + 1)]

   def read(self, partition):
       w3 = self._get_web3()
       block = w3.eth.get_block(partition.block_number)

       # Filter out empty blocks
       if not block.transactions or len(block.transactions) == 0:
           return iter([])

       gas_price = block.get("baseFeePerGas", 0)
       if gas_price is not None and isinstance(gas_price, int):
           gas_price_gwei = gas_price / 1e9
       else:
           gas_price_gwei = None

       return iter([(
           block.number,
           datetime.utcfromtimestamp(block.timestamp).isoformat(),
           int(block.timestamp),
           len(block.transactions),
           block.miner,
           block.hash.hex(),
           gas_price_gwei
       )])

   def commit(self, end):
       pass

Data Source Class:

class EthereumDataSource(DataSource):
   @classmethod
   def name(cls):
       return "ethereum_block"

   def __init__(self, options):
       self.options = options

   def schema(self):
       return StructType([
           StructField("block_number", IntegerType()),
           StructField("timestamp", StringType()),
           StructField("timestamp_unix", LongType()),
           StructField("tx_count", IntegerType()),
           StructField("miner", StringType()),
           StructField("block_hash", StringType()),
           StructField("base_fee_gwei", DoubleType())
       ])

   def streamReader(self, schema):
       return EthereumStreamReader(schema, self.options)

 

4. Bronze Layer: Raw Ingestion from Web3 Sources

The Bronze tables act as the raw landing zone for real-time data ingested from three Web3 sources via the Databricks Python Data Source API with resumable polling logic. Each source streams live data in its native format and schema, enabling ingestion at scale without external infrastructure.

The Bronze tables are populated using Structured Streaming with the Python Data Source API. Each source is registered as a custom streaming source. Binance provides CEX trades, Uniswap provides DEX swap data, and Ethereum provides block-level metadata. These sources are read using spark.readStream, and the data is continuously written into Delta tables on classic compute.

This layer captures raw trade, swap, and blockchain data directly from public Web3 APIs and serves as the foundation for downstream processing.

Ingestion Logic:

4a) bronze_binance_trades

spark.dataSource.register(BinanceDataSource)

(spark.readStream.format("binance_trade").load()
 .writeStream.format("delta")
 .option("checkpointLocation", "/Volumes/devyk/web3_blog/vol_checkpoints/bronze_binance_trades")
 .outputMode("append")
 .trigger(processingTime="1 second") # Short interval for near-continuous processing
 .toTable("devyk.web3_blog.bronze_binance_trades"))

The following output shows raw ETH/USDC trade events captured from Binance.

srikanthkilari_0-1748524537684.png

 

4b) bronze_uniswap_swaps

spark.dataSource.register(UniswapDataSource)

(spark.readStream
   .format("uniswap_swap")
   .option("pool", "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8")
   .option("api_key", f"{dbutils.secrets.get('devyk-web3-blog', 'dex-api-key')}")
   .option("subgraph_id", f"{dbutils.secrets.get('devyk-web3-blog', 'dex-subgraph-id')}")
   .load()
   .writeStream.format("delta")
   .option("checkpointLocation", "/Volumes/devyk/web3_blog/vol_checkpoints/bronze_uniswap_swaps")
   .outputMode("append")
   .trigger(processingTime="1 second")
   .toTable("devyk.web3_blog.bronze_uniswap_swaps"))

The output below displays recent swap transactions from the Uniswap pool, including USD and token amounts.

srikanthkilari_1-1748524606007.png

 

4c) bronze_ethereum_blocks

spark.dataSource.register(EthereumDataSource)


(spark.readStream
   .format("ethereum_block")
   .option("rpc_url", f"{dbutils.secrets.get('devyk-web3-blog', 'ethereum-alchemy-endpoint')}")
   .load()
   .writeStream.format("delta")
   .option("checkpointLocation", "/Volumes/devyk/web3_blog/vol_checkpoints/bronze_ethereum_blocks")
   .outputMode("append")
   .trigger(processingTime="1 second")
   .toTable("devyk.web3_blog.bronze_ethereum_blocks"))

The output below shows raw Ethereum block metadata such as block number, miner, and gas fee details.

srikanthkilari_2-1748524672733.png

Each streaming query uses a checkpoint location to track offset progress and ensure fault tolerance. The Bronze tables act as an append-only event log and are read inside the DLT pipeline through streaming views that apply transformations and schema normalization.

 

5. Silver Layer: Clean and Normalized Data for Analysis

The Bronze streaming tables, populated using Structured Streaming via the Python Data Source API, capture raw trade, swap, and block data into Databricks Delta tables. To ensure data consistency and avoid duplication due to potential replays, delivery retries, or historical backfills, the Silver layer is implemented using SCD Type 1 logic within the DLT pipeline.

Using the apply_changes() function, the pipeline performs deduplication based on primary keys and event timestamps. Each Silver table is maintained as an up-to-date Delta table that stores only the latest version of each record, allowing accurate and up-to-date streaming transformations. This approach provides strong data quality guarantees while retaining a low-latency, scalable processing pattern for continuous ingestion from Web3 APIs.

Since the Bronze tables are created and updated outside the DLT pipeline using Structured Streaming, the Silver pipeline reads from those Delta tables through intermediate views, which are not published to Unity Catalog. These views apply lightweight transformations such as data type casting, timestamp normalization, and schema alignment. They serve as standardized staging layers that make the data ready for apply_changes() to maintain clean and consistent Silver tables. 

These Silver streaming tables act as the canonical, deduplicated layer for all downstream analytical processing. They ensure that real-time Web3 data is clean, normalized, and ready to power unified insights across decentralized and centralized sources.

 

5a) silver_binance_trades

This streaming table is derived from the bronze_binance_trades source. It prepares the centralized exchange (CEX) trade data for downstream analytics related to trend detection, volume measurement, and pricing behavior.

import dlt
@dlt.view
def vw_bronze_binance_trades():
 return spark.readStream.table("bronze_binance_trades")\
       .selectExpr(
           "trade_id",
           "symbol AS pair",
           "CAST(timestamp AS TIMESTAMP) AS ts",
           "CAST(price AS DOUBLE) AS price",
           "CAST(quantity AS DOUBLE) AS quantity",
           "price * quantity AS notional_usd",
           "date_trunc('minute', CAST(timestamp AS TIMESTAMP)) AS minute_bucket"
       )

dlt.create_target_table(
  name = "silver_binance_trades"
)
dlt.apply_changes(
  target = "silver_binance_trades",
  source = "vw_bronze_binance_trades",
  keys = ["trade_id"],
  sequence_by = col("ts"),
  stored_as_scd_type = 1
)

The output below presents cleaned and deduplicated Binance trade records with normalized timestamps and pricing fields.

srikanthkilari_3-1748524757799.png

 

5b) silver_uniswap_swaps

This streaming table is derived from bronze_uniswap_swaps and provides a clean, structured view of swap activity on Uniswap. It supports slippage detection, liquidity assessment, and DEX-based analytics.

import dlt
@dlt.view
def vw_bronze_uniswap_swaps():
   return (
       spark.readStream.table("bronze_uniswap_swaps")
       .selectExpr(
           "swap_id",
           "pair",
           "CAST(timestamp AS TIMESTAMP) AS ts",
           "CAST(amount_usd AS DOUBLE) AS amount_usd",
           "CAST(amount_eth AS DOUBLE) AS amount_eth",
           "CAST(amount_usdc AS DOUBLE) AS amount_usdc",
           "date_trunc('minute', CAST(timestamp AS TIMESTAMP)) AS minute_bucket"
       )
   )

dlt.create_target_table(
  name = "silver_uniswap_swaps"
)
dlt.apply_changes(
  target = "silver_uniswap_swaps",
  source = "vw_bronze_uniswap_swaps",
  keys = ["swap_id"],
  sequence_by = col("ts"),
  stored_as_scd_type = 1
)

The following sample shows structured and time-aligned Uniswap swap records with derived ETH and USDC amounts.

srikanthkilari_4-1748524818989.png

 

5c) silver_ethereum_blocks

This streaming table is derived from bronze_ethereum_blocks and enables time-aligned analysis of Ethereum blockchain activity. It retains gas fee metrics and miner information for correlation with CEX and DEX trading behavior.

import dlt
@dlt.view
def vw_bronze_ethereum_blocks():
 return spark.readStream.table("bronze_ethereum_blocks")\
       .selectExpr(
           "block_number",
           "CAST(timestamp AS TIMESTAMP) AS ts",
           "tx_count",
           "miner",
           "base_fee_gwei",
           "date_trunc('minute', CAST(timestamp AS TIMESTAMP)) AS minute_bucket"
       )

dlt.create_target_table(
  name = "silver_ethereum_blocks"
)
dlt.apply_changes(
  target = "silver_ethereum_blocks",
  source = "vw_bronze_ethereum_blocks",
  keys = ["block_number"],
  sequence_by = col("ts"),
  stored_as_scd_type = 1
)

The output displays curated Ethereum block details, including gas fees and transaction counts, ready for downstream analysis.

srikanthkilari_5-1748524872601.png

 

6. Gold Layer: Near Real-Time Web3 Trading Analytics

The Gold layer transforms curated data from the Silver layer into actionable, time-aligned trading and blockchain analytics. These aggregations are defined as MVs using DLT, enabling incremental refresh and efficient computation over streaming data. Each Gold MV is designed to extract cross-source insights from CEX trades, DEX swaps, and on-chain metadata. The aggregations are grouped by minute-level buckets, which provide a consistent time axis to correlate trends across these heterogeneous systems. The following Gold MVs are implemented:

6a) gold_price_metrics

This Gold MV joins Silver CEX and DEX data on the minute-level time bucket to produce unified trading metrics for the ETH/USDC pair. It computes average and volume-weighted average prices (VWAP) from the Binance trades, and derives corresponding DEX prices from Uniswap swaps based on the ETH and USDC amounts exchanged. It also includes trade and swap counts per minute and estimates slippage by comparing CEX and DEX price trends. This MV supports use cases such as arbitrage detection, cross-market volume analysis, and liquidity monitoring.

.table(
 name="gold_price_metrics",
 comment="Aggregates trading metrics across CEX and DEX sources for each crypto pair"
)
def gold_price_metrics():
   df_cex = dlt.read("silver_binance_trades")
   df_dex = dlt.read("silver_uniswap_swaps").filter("amount_eth IS NOT NULL AND amount_eth != 0")
   return (
       df_cex.alias("c")
       .join(df_dex.alias("d"), on=["pair", "minute_bucket"], how="inner")
       .groupBy("pair", "minute_bucket")
       .agg(
           round(avg("c.price"), 4).alias("avg_cex_price"),
           round(when(sum("c.quantity") != 0, sum("c.notional_usd") / sum("c.quantity")).otherwise(None),4).alias("vw_cex_price"),
           round(abs(avg("amount_usdc") / avg("amount_eth")), 4).alias("avg_dex_price"),
           round(when(sum("amount_eth") != 0, abs(sum("amount_usdc") / sum("amount_eth"))).otherwise(None),4).alias("vw_dex_price"),
           countDistinct("c.trade_id").alias("cex_trade_count"),
           countDistinct("d.swap_id").alias("dex_swap_count"),
           round(
               when(
                   avg(col("d.amount_usdc") / col("d.amount_eth")).isNotNull()
                   & avg("c.price").isNotNull()
                   & (avg("c.price") != 0),
                   (abs(avg(col("d.amount_usdc") / col("d.amount_eth"))) - avg("c.price")) / avg("c.price")
               ).otherwise(None),
               4
           ).alias("estimated_slippage_ratio")
       )
   )

The following output shows minute-level pricing metrics, trade volumes, and estimated slippage between CEX and DEX for the ETH/USDC pair.

srikanthkilari_6-1748524949117.png

 

6b) gold_blockchain_metrics

This Gold MV aggregates block-level metadata from the Ethereum Silver table. It calculates metrics such as the number of blocks produced per minute, total transaction count, and the average base gas fee (in gwei) for each time interval. These metrics provide insight into on-chain activity, network congestion, and gas trends over time. The results can be correlated with trading volume or price movement to understand how market behavior aligns with blockchain activity.

.table(
 name="gold_blockchain_metrics",
 comment="Aggregates Ethereum block metadata and gas metrics"
)
def gold_blockchain_metrics():
   return (
       dlt.read("silver_ethereum_blocks")
       .withColumn("blockchain_name",lit("ethereum"))
       .groupBy("blockchain_name","minute_bucket")
       .agg(
           countDistinct("block_number").alias("blocks_per_minute"),
           sum("tx_count").alias("total_transactions"),
           round(avg("base_fee_gwei"), 4).alias("avg_gas_fee")
       )
   )

The output below presents aggregated Ethereum block activity per minute, including transaction counts and average gas fees.

srikanthkilari_7-1748524995849.png

 

6c) gold_ema_signals

This Gold MV calculates two widely used technical indicators: the Relative Strength Index (RSI) and the Moving Average Convergence Divergence (MACD). These are derived from the time-series price data of ETH/USDC trades recorded in the Binance Silver table. The MACD is computed using exponential moving averages (EMAs) over different lookback periods to detect trend changes. RSI is calculated using gains and losses over a 14-period window to measure market momentum. These indicators help identify overbought or oversold conditions and are commonly used in algorithmic trading strategies and market signal analysis.

This Gold MV computes widely used technical indicators based on time-series price data from Binance. It calculates two Exponential Moving Averages (EMAs): a short-term 12-period EMA (ema_short) and a longer-term 26-period EMA (ema_long). The MACD (Moving Average Convergence Divergence) is derived as the difference between these two EMAs, and a 9-period EMA of the MACD is used as the Signal Line (macd_signal). Additionally, the 14-period Relative Strength Index (RSI) is calculated to measure the strength of recent price movements. These indicators help identify momentum shifts, overbought or oversold conditions, and potential market entry or exit points.

.table(
 name="gold_ema_signals",
 comment="Computes MACD, Signal Line, and RSI using CEX price time series"
)
def gold_ema_signals():
   df = (
       dlt.read("silver_binance_trades")
       .groupBy("pair", "minute_bucket")
       .agg(round(avg("price"),4).alias("avg_price"))
   )
   w_order = Window.partitionBy("pair").orderBy("minute_bucket")
   w_ema_short = w_order.rowsBetween(-11, 0)
   w_ema_long = w_order.rowsBetween(-25, 0)
   w_signal = w_order.rowsBetween(-8, 0)
   w_rsi = w_order.rowsBetween(-13, 0)  # 14-period window
   df_with_change = df.withColumn("prev_avg_price", lag("avg_price").over(w_order)) \
       .withColumn("price_change", col("avg_price") - col("prev_avg_price")) \
       .withColumn("gain", when(col("price_change") > 0, col("price_change")).otherwise(0)) \
       .withColumn("loss", when(col("price_change") < 0, abs(col("price_change"))).otherwise(0))
   return (
       df_with_change
       .withColumn("ema_short", round(avg("avg_price").over(w_ema_short), 4))
       .withColumn("ema_long", round(avg("avg_price").over(w_ema_long), 4))
       .withColumn("macd", round(col("ema_short") - col("ema_long"), 4))
       .withColumn("macd_signal", round(avg("macd").over(w_signal), 4))
       .withColumn("avg_gain", avg("gain").over(w_rsi))
       .withColumn("avg_loss", avg("loss").over(w_rsi))
       .withColumn("rsi", round(100 - (100 / (1 + (col("avg_gain") / col("avg_loss")))), 2))
       .select("pair", "minute_bucket", "avg_price", "ema_short", "ema_long", "macd", "macd_signal", "rsi")
   )

The sample result displays calculated EMA values, MACD indicators, and RSI scores based on Binance price movements.

srikanthkilari_8-1748525054491.png

 

7. Conclusion 

Databricks provides a strong foundation for building near real-time analytics pipelines across both decentralized and centralized financial systems. With native support for the Python Data Source API, it becomes possible to stream live trading and blockchain data directly from public APIs, eliminating the need for complex intermediate infrastructure.

This solution adopts the Medallion Architecture to organize the pipeline into clean, modular layers with low latency. The Bronze layer captures raw Web3 events using Structured Streaming via the Python Data Source API. The Silver streaming tables and Gold Materialized Views (MVs), implemented using serverless DLT, standardize and aggregate the data into meaningful near real-time trading indicators and blockchain insights. These MVs compute metrics such as MACD, RSI, gas fees, trade volumes, and slippage across CEX, DEX, and on-chain activity. 

This architecture enables scalable, maintainable, and fully automated pipelines that continuously respond to new market and network signals. It lays the foundation for advanced analytics use cases such as cross-exchange arbitrage detection, on-chain trend correlation, and unified trading intelligence on the lakehouse.