cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
Zach_Jacobson23
Databricks Employee
Databricks Employee

Simplifying Data Ingestion into Databricks with the New Spark Data Source API

Are you ever frustrated trying to bring external data sources into Databricks? With the rapid growth of products, partners, and connectors, it can feel overwhelming to keep up. However, Databricks remains a first-class Data and AI platform capable of handling nearly any data ingestion scenario.

Thanks to the evolving Lakehouse ecosystem, especially with the rapid development of Lakeflow Connect and its extensive connector ecosystem, ingesting data into Databricks has become more straightforward than ever. Still, with countless connectors being built—many of which may never be a priority—having a flexible, robust approach is essential.

This is where the new and improved Spark Data Source API comes into play. If you're looking to read data—whether batch or streaming—into Databricks, and haven't found a reliable production-ready method yet, this blog is for you.

Helpful Resources

Before diving deeper, I want to share some valuable references that complement the approach discussed here. These resources provide additional insights into data ingestion and custom data sources:

Note: This blog is the only one, to my knowledge, that explains how to implement JDBC connections specifically using the new Data Source API for both batch and streaming scenarios.

Focus Topic: Ingesting SQL Server as a Streaming Source via JDBC

In this example, I'll demonstrate how to use the new API to ingest data from SQL Server in a streaming fashion, employing incremental load techniques to avoid full truncates and reloads. While I’ll focus on SQL Server, the method is applicable to any source that has a unique key (primary or composite) or a transaction timestamp.

For this, I'll utilize an open-source JDBC Python package called JaydeBeApi, which I will wrap within custom DataSource and DataSourceStreamReader classes.

Step-by-Step Guide with Code Examples

Step 1: Environment Setup

  • If you're running on Databricks, ensure you're on Databricks Runtime 15.3 or above, or using Serverless.
  • You can also run this with open-source Spark 4.0.

Step 2: Install and Import Packages

import jaydebeapi

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter, InputPartition

from pyspark.sql.types import StructType

import pyspark.sql.functions as F

from typing import Iterator, Tuple

import json

import os

import time

import requests

from datetime import datetime

Step 3: Define Your Custom Data Source

Create a subclass of the DataSource

class JDBCStreamDataSource(DataSource😞 

 @classmethod

 def name(cls😞

   return "sql_server_connecter"

 

 def schema(self😞

   """Defines the output schema of the data source."""

   return StructType([

       StructField("user_id", IntegerType(), True),

       StructField("title", StringType(), True),

       StructField("salary", IntegerType(), True),

       StructField("update_time", StringType(), True)

   ])

 def streamReader(self, schema: StructType):

return JDBCStreamReader(schema,self.options)

 

 

Within the DataSource class, the name method defines the output schema of your streaming table. This can be made dynamically and does not have to be hard-coded. The name method is just what you want to name your Spark connector, i.e., sql_server_connecter. The streamReader will return the next class, JDBCStreamReader, that we will talk about in the next section.

 

class RangePartition(InputPartition😞

   def __init__(self, start, end😞

       self.start = start

       self.end = end

 

The RangePartition class will partition your ingest across as many Spark tasks as there are partitions. Partitioning is useful if you have high throughput from your source system (> 1000 records/second - benchmark). This will be shown in the next section  

Step 4: Read The Custom Data Source

Finally, we have the main class that actually reads and ingests the data. Create a subclass of the DataSourceStreamReader.

 

class JDBCStreamReader(DataSourceStreamReader😞

   def __init__(self, schema, options😞

       self.jdbc_url = options.get("jdbc_url")

       self.jdbc_driver_name = options.get("jdbc_driver_name")

       self.jdbc_driver_jar_url = options.get("jdbc_driver_jar_url")

       self.username = options.get("username")

       self.password = options.get("password")

       self.table = options.get("table")

       self.progress_path = options.get("progress_path")

       #self.partitions = options.get("partitions")

       self.token = token

       self.url = f"https://{workspace_url}/api/2.0/fs/files{self.progress_path}progress.json"

       self._load_progress()

       self.start_time = options.get("start_time")

   def initialOffset(self😞

       """

       Only read records updated after configured start_time

       """

       return {"offset": self.start_time}

    

   def _load_progress(self😞

       headers = {

           "Authorization": f"Bearer {self.token}",

       }

       response = requests.get(self.url, headers=headers)

       current = response.json().get('current', 0)

       self.current = current

   def _save_progress(self😞

       url = f"{self.url}?overwrite=true"

       headers = {

           "Authorization": f"Bearer {self.token}",

           "Content-Type": "application/json"

       }

       data = json.dumps({"current": self.current})

       response = requests.put(url, headers=headers, data=data)

 

   def latestOffset(self) -> dict:

 

       self.current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

       return {"offset": self.current}

    

   def partitions(self, start: dict, end: dict😞

       #returns a sequence of partition objects

       start_offset = datetime.strptime(start["offset"], "%Y-%m-%d %H:%M:%S")

       end_offset = datetime.strptime(end["offset"], "%Y-%m-%d %H:%M:%S")

       # Calculate total duration

       total_duration = end_offset - start_offset

 

       #Calculate the duration of each partition range

       step = total_duration / 4

 

       #Generate the 4 ranges using list comprehension

       ranges = [

           (

               (start_offset + step * i).strftime("%Y-%m-%d %H:%M:%S"),

               (start_offset + step * (i + 1)).strftime("%Y-%m-%d %H:%M:%S")

           ) for i in range(4)]

       return [RangePartition(ranges[0][0],ranges[0][1]),RangePartition(ranges[1][0],ranges[1][1]),RangePartition(ranges[2][0],ranges[2][1]),RangePartition(ranges[3][0],ranges[3][1])]

       #return [RangePartition(start_offset, end_offset)]

       #return [RangePartition(start["offset"], end["offset"])]

    

   def commit(self, end😞

       self._save_progress()

    

   def read(self, partition) -> Iterator[Tuple]:

       import jaydebeapi

       """

       Read records between a time range

       """

       start, end = partition.start, partition.end

       start_timestamp = start

       end_timestamp = end

       #cursor = self.conn.cursor()

       with jaydebeapi.connect(

           jclassname=self.jdbc_driver_name,

           url=self.jdbc_url,

           driver_args=[self.username, self.password],

           jars=self.jdbc_driver_jar_url

       ) as conn:

           with conn.cursor() as cursor:

               cursor.execute(f"SELECT * FROM {self.table} where update_time > '{start_timestamp}' and update_time <= '{end_timestamp}'")

       # Convert rows to pandas dataframe

               rows = cursor.fetchall()

       #columns = [desc[0] for desc in cursor.description]

       #df = pd.DataFrame(rows, columns=columns)

       for r in rows:

         yield (r[0], r[1], r[2], r[3])

 

Additional Notes: Key Concepts in the Implementation

There is a lot to unpack here. To save time, one of my great colleagues has explained many of the methods used within this approach; see their blog for reference.

There are a few important aspects worth highlighting:

Partitioning with Ranges

The partitions method is implemented by splitting the time range into four subranges based on a timestamp column. These ranges are then passed to the RangePartition class, which divides each partition into chunks that are processed across four Spark tasks. This approach effectively parallelizes the stream, enabling scalable ingestion.

Incremental Processing and Progress Tracking

Incremental ingestion is achieved by saving the latest offset, specifically, the maximum timestamp seen during processing. After each batch, the latestOffset() method is called, which passes this timestamp to _save_progress(). This method persists the progress to a Unity Catalog volume (or other persistent storage). When the next batch begins, _load_progress() retrieves this saved value to set the starting point, ensuring no data is missed or duplicated.

Using Generators (yield) to Prevent Driver Overload

 

Lastly, we are using a generator yield instead of a return to avoid overloading the Spark driver. This could happen on initial backfill or if you have a large future incremental ingestion load (if you schedule this streaming job). 

 

Step 5: Register and Wrap Connector In a Declarative Pipeline

Now it’s time for the fun part!

Here we register the connector for consumption

spark.dataSource.register(JDBCStreamDataSource)

 

Finally, we wrap it in a Declarative Pipeline (formerly known as DLT) and pass the necessary parameters to the options

import dlt

 

@Dlt.table(

name = "sql_server_table",

table_properties={"delta.enableChangeDataFeed" : "true","pipelines.reset.allowed": "false",}

)

def sql_server_table():

 return (spark.readStream.format("sql_server_connector") \

.option("jdbc_url", "jdbc:sqlserver://;serverName=<serverName>;databaseName=<databaseName>;trustServerCertificate=false;encrypt=false") \

.option("jdbc_driver_name", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \

.option("jdbc_driver_jar_url", "/path/to/jar/file.jar") \

.option("username", "admin") \

.option("password", "password") \

.option("table", "dbo.employees") \

.option("progress_path", "</path/to/checkpoint>") \

.option("start_time","2025-06-17 10:47:43") \

.load())

 

 

Wrap-up

By leveraging the new Spark Data Source API, you can build custom, incremental streaming connectors for your data sources with greater flexibility and control. This approach is especially useful for sources like SQL Server, Oracle, or any system with a reliable timestamp or unique key. Plus, you only need to really build one connector per source system, which is a huge bonus! Finally, wrap this all up inside Databricks Declarative Pipelines for the best performance and developer experience!

If you're interested in extending this further or exploring specific use cases, feel free to ask and reach out!

Happy data-ingesting!