Are you ever frustrated trying to bring external data sources into Databricks? With the rapid growth of products, partners, and connectors, it can feel overwhelming to keep up. However, Databricks remains a first-class Data and AI platform capable of handling nearly any data ingestion scenario.
Thanks to the evolving Lakehouse ecosystem, especially with the rapid development of Lakeflow Connect and its extensive connector ecosystem, ingesting data into Databricks has become more straightforward than ever. Still, with countless connectors being built—many of which may never be a priority—having a flexible, robust approach is essential.
This is where the new and improved Spark Data Source API comes into play. If you're looking to read data—whether batch or streaming—into Databricks, and haven't found a reliable production-ready method yet, this blog is for you.
Before diving deeper, I want to share some valuable references that complement the approach discussed here. These resources provide additional insights into data ingestion and custom data sources:
Note: This blog is the only one, to my knowledge, that explains how to implement JDBC connections specifically using the new Data Source API for both batch and streaming scenarios.
In this example, I'll demonstrate how to use the new API to ingest data from SQL Server in a streaming fashion, employing incremental load techniques to avoid full truncates and reloads. While I’ll focus on SQL Server, the method is applicable to any source that has a unique key (primary or composite) or a transaction timestamp.
For this, I'll utilize an open-source JDBC Python package called JaydeBeApi, which I will wrap within custom DataSource and DataSourceStreamReader classes.
import jaydebeapi
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter, InputPartition
from pyspark.sql.types import StructType
import pyspark.sql.functions as F
from typing import Iterator, Tuple
import json
import os
import time
import requests
from datetime import datetime
Create a subclass of the DataSource.
class JDBCStreamDataSource(DataSource😞
@classmethod
def name(cls😞
return "sql_server_connecter"
def schema(self😞
"""Defines the output schema of the data source."""
return StructType([
StructField("user_id", IntegerType(), True),
StructField("title", StringType(), True),
StructField("salary", IntegerType(), True),
StructField("update_time", StringType(), True)
])
def streamReader(self, schema: StructType):
return JDBCStreamReader(schema,self.options)
Within the DataSource class, the name method defines the output schema of your streaming table. This can be made dynamically and does not have to be hard-coded. The name method is just what you want to name your Spark connector, i.e., sql_server_connecter. The streamReader will return the next class, JDBCStreamReader, that we will talk about in the next section.
class RangePartition(InputPartition😞
def __init__(self, start, end😞
self.start = start
self.end = end
The RangePartition class will partition your ingest across as many Spark tasks as there are partitions. Partitioning is useful if you have high throughput from your source system (> 1000 records/second - benchmark). This will be shown in the next section
Finally, we have the main class that actually reads and ingests the data. Create a subclass of the DataSourceStreamReader.
class JDBCStreamReader(DataSourceStreamReader😞
def __init__(self, schema, options😞
self.jdbc_url = options.get("jdbc_url")
self.jdbc_driver_name = options.get("jdbc_driver_name")
self.jdbc_driver_jar_url = options.get("jdbc_driver_jar_url")
self.username = options.get("username")
self.password = options.get("password")
self.table = options.get("table")
self.progress_path = options.get("progress_path")
#self.partitions = options.get("partitions")
self.token = token
self.url = f"https://{workspace_url}/api/2.0/fs/files{self.progress_path}progress.json"
self._load_progress()
self.start_time = options.get("start_time")
def initialOffset(self😞
"""
Only read records updated after configured start_time
"""
return {"offset": self.start_time}
def _load_progress(self😞
headers = {
"Authorization": f"Bearer {self.token}",
}
response = requests.get(self.url, headers=headers)
current = response.json().get('current', 0)
self.current = current
def _save_progress(self😞
url = f"{self.url}?overwrite=true"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
data = json.dumps({"current": self.current})
response = requests.put(url, headers=headers, data=data)
def latestOffset(self) -> dict:
self.current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
return {"offset": self.current}
def partitions(self, start: dict, end: dict😞
#returns a sequence of partition objects
start_offset = datetime.strptime(start["offset"], "%Y-%m-%d %H:%M:%S")
end_offset = datetime.strptime(end["offset"], "%Y-%m-%d %H:%M:%S")
# Calculate total duration
total_duration = end_offset - start_offset
#Calculate the duration of each partition range
step = total_duration / 4
#Generate the 4 ranges using list comprehension
ranges = [
(
(start_offset + step * i).strftime("%Y-%m-%d %H:%M:%S"),
(start_offset + step * (i + 1)).strftime("%Y-%m-%d %H:%M:%S")
) for i in range(4)]
return [RangePartition(ranges[0][0],ranges[0][1]),RangePartition(ranges[1][0],ranges[1][1]),RangePartition(ranges[2][0],ranges[2][1]),RangePartition(ranges[3][0],ranges[3][1])]
#return [RangePartition(start_offset, end_offset)]
#return [RangePartition(start["offset"], end["offset"])]
def commit(self, end😞
self._save_progress()
def read(self, partition) -> Iterator[Tuple]:
import jaydebeapi
"""
Read records between a time range
"""
start, end = partition.start, partition.end
start_timestamp = start
end_timestamp = end
#cursor = self.conn.cursor()
with jaydebeapi.connect(
jclassname=self.jdbc_driver_name,
url=self.jdbc_url,
driver_args=[self.username, self.password],
jars=self.jdbc_driver_jar_url
) as conn:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {self.table} where update_time > '{start_timestamp}' and update_time <= '{end_timestamp}'")
# Convert rows to pandas dataframe
rows = cursor.fetchall()
#columns = [desc[0] for desc in cursor.description]
#df = pd.DataFrame(rows, columns=columns)
for r in rows:
yield (r[0], r[1], r[2], r[3])
There is a lot to unpack here. To save time, one of my great colleagues has explained many of the methods used within this approach; see their blog for reference.
There are a few important aspects worth highlighting:
The partitions method is implemented by splitting the time range into four subranges based on a timestamp column. These ranges are then passed to the RangePartition class, which divides each partition into chunks that are processed across four Spark tasks. This approach effectively parallelizes the stream, enabling scalable ingestion.
Incremental ingestion is achieved by saving the latest offset, specifically, the maximum timestamp seen during processing. After each batch, the latestOffset() method is called, which passes this timestamp to _save_progress(). This method persists the progress to a Unity Catalog volume (or other persistent storage). When the next batch begins, _load_progress() retrieves this saved value to set the starting point, ensuring no data is missed or duplicated.
Lastly, we are using a generator yield instead of a return to avoid overloading the Spark driver. This could happen on initial backfill or if you have a large future incremental ingestion load (if you schedule this streaming job).
Now it’s time for the fun part!
Here we register the connector for consumption
spark.dataSource.register(JDBCStreamDataSource)
Finally, we wrap it in a Declarative Pipeline (formerly known as DLT) and pass the necessary parameters to the options
import dlt
@Dlt.table(
name = "sql_server_table",
table_properties={"delta.enableChangeDataFeed" : "true","pipelines.reset.allowed": "false",}
)
def sql_server_table():
return (spark.readStream.format("sql_server_connector") \
.option("jdbc_url", "jdbc:sqlserver://;serverName=<serverName>;databaseName=<databaseName>;trustServerCertificate=false;encrypt=false") \
.option("jdbc_driver_name", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
.option("jdbc_driver_jar_url", "/path/to/jar/file.jar") \
.option("username", "admin") \
.option("password", "password") \
.option("table", "dbo.employees") \
.option("progress_path", "</path/to/checkpoint>") \
.option("start_time","2025-06-17 10:47:43") \
.load())
By leveraging the new Spark Data Source API, you can build custom, incremental streaming connectors for your data sources with greater flexibility and control. This approach is especially useful for sources like SQL Server, Oracle, or any system with a reliable timestamp or unique key. Plus, you only need to really build one connector per source system, which is a huge bonus! Finally, wrap this all up inside Databricks Declarative Pipelines for the best performance and developer experience!
If you're interested in extending this further or exploring specific use cases, feel free to ask and reach out!
Happy data-ingesting!
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.