<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>article Ingesting Source Systems - Custom Data Source API JDBC Connections in Technical Blog</title>
    <link>https://community.databricks.com/t5/technical-blog/ingesting-source-systems-custom-data-source-api-jdbc-connections/ba-p/129653</link>
    <description>&lt;H1&gt;&lt;STRONG&gt;Simplifying Data Ingestion into Databricks with the New Spark Data Source API&lt;/STRONG&gt;&lt;/H1&gt;
&lt;P&gt;&lt;SPAN&gt;Are you ever frustrated trying to bring external data sources into Databricks? With the rapid growth of products, partners, and connectors, it can feel overwhelming to keep up. However, Databricks remains a first-class Data and AI platform capable of handling nearly any data ingestion scenario.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Thanks to the evolving Lakehouse ecosystem, especially with the rapid development of Lakeflow Connect and its extensive connector ecosystem, ingesting data into Databricks has become more straightforward than ever. Still, with countless connectors being built—many of which may never be a priority—having a flexible, robust approach is essential.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This is where the &lt;/SPAN&gt;&lt;STRONG&gt;new and improved Spark Data Source API&lt;/STRONG&gt;&lt;SPAN&gt; comes into play. If you're looking to read data—whether batch or streaming—into Databricks, and haven't found a reliable production-ready method yet, this blog is for you.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Helpful Resources&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;Before diving deeper, I want to share some valuable references that complement the approach discussed here. These resources provide additional insights into data ingestion and custom data sources:&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://www.databricks.com/blog/simplify-data-ingestion-new-python-data-source-api" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Databricks Blog on Python Data Source API&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;A href="https://community.databricks.com/t5/technical-blog/enhancing-the-new-pyspark-custom-data-sources-streaming-api/ba-p/75538" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;Community Blog on Enhancing the PySpark Streaming API&lt;/SPAN&gt;&lt;/A&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;SPAN&gt;Note: This blog is the only one, to my knowledge, that explains how to implement JDBC connections specifically using the new Data Source API for both batch and streaming scenarios.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Focus Topic: Ingesting SQL Server as a Streaming Source via JDBC&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;In this example, I'll demonstrate how to use the new API to ingest data from SQL Server in a streaming fashion, employing incremental load techniques to avoid full truncates and reloads. While I’ll focus on SQL Server, the method is applicable to any source that has a &lt;/SPAN&gt;&lt;STRONG&gt;unique key&lt;/STRONG&gt;&lt;SPAN&gt; (primary or composite) or a &lt;/SPAN&gt;&lt;STRONG&gt;transaction timestamp&lt;/STRONG&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;For this, I'll utilize an open-source JDBC Python package called &lt;/SPAN&gt;&lt;SPAN&gt;JaydeBeApi&lt;/SPAN&gt;&lt;SPAN&gt;, which I will wrap within custom &lt;/SPAN&gt;&lt;SPAN&gt;DataSource&lt;/SPAN&gt;&lt;SPAN&gt; and &lt;/SPAN&gt;&lt;SPAN&gt;DataSourceStreamReader&lt;/SPAN&gt;&lt;SPAN&gt; classes.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Step-by-Step Guide with Code Examples&lt;/STRONG&gt;&lt;/H2&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 1: Environment Setup&lt;/STRONG&gt;&lt;/H3&gt;
&lt;UL&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;SPAN&gt;If you're running on &lt;/SPAN&gt;&lt;STRONG&gt;Databricks&lt;/STRONG&gt;&lt;SPAN&gt;, ensure you're on &lt;/SPAN&gt;&lt;STRONG&gt;Databricks Runtime 15.3&lt;/STRONG&gt;&lt;SPAN&gt; or above, or using &lt;/SPAN&gt;&lt;STRONG&gt;Serverless&lt;/STRONG&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI style="font-weight: 400;" aria-level="1"&gt;&lt;SPAN&gt;You can also run this with open-source Spark 4.0.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 2: Install and Import Packages&lt;/STRONG&gt;&lt;/H3&gt;
&lt;LI-CODE lang="python"&gt;mport jaydebeapi

from pyspark.sql.types import StructType, StructField, IntegerType, StringType

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter, InputPartition

from pyspark.sql.types import StructType

import pyspark.sql.functions as F

from typing import Iterator, Tuple

import json

import os

import time

import requests

from datetime import datetime&lt;/LI-CODE&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 3: Define Your Custom Data Source&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;STRONG&gt;Create a subclass of the DataSource. &lt;/STRONG&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;
class JDBCStreamDataSource(DataSource‌)

 @classmethod

 def name(cls‌)

   return "sql_server_connecter"


 def schema(self‌)

   """Defines the output schema of the data source."""

   return StructType([

       StructField("user_id", IntegerType(), True),

       StructField("title", StringType(), True),

       StructField("salary", IntegerType(), True),

       StructField("update_time", StringType(), True)

   ])

 def streamReader(self, schema: StructType):

   return JDBCStreamReader(schema,self.options)


&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;Within the &lt;/SPAN&gt;&lt;SPAN&gt;DataSource &lt;/SPAN&gt;&lt;SPAN&gt;class, the&lt;/SPAN&gt;&lt;SPAN&gt; name &lt;/SPAN&gt;&lt;SPAN&gt;method defines the output schema of your streaming table. This can be made dynamically and does not have to be hard-coded. The name method is just what you want to name your Spark connector, i.e., &lt;/SPAN&gt;&lt;SPAN&gt;sql_server_connecter&lt;/SPAN&gt;&lt;SPAN&gt;. &lt;/SPAN&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;SPAN&gt;streamReader&lt;/SPAN&gt; &lt;SPAN&gt;will return the next class, &lt;/SPAN&gt;&lt;SPAN&gt;JDBCStreamReader,&lt;/SPAN&gt; &lt;SPAN&gt;that we will talk about in the next section.&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;class RangePartition(InputPartition‌)

   def __init__(self, start, end‌)

       self.start = start

       self.end = end&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;SPAN&gt;RangePartition&lt;/SPAN&gt; &lt;SPAN&gt;class will partition your ingest across as many Spark tasks as there are partitions. Partitioning is useful if you have high throughput from your source system (&amp;gt; 1000 records/second - benchmark). This will be shown in the next section &lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 4: Read The Custom Data Source&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Finally, we have the main class that actually reads and ingests the data. &lt;/SPAN&gt;&lt;SPAN&gt;Create a subclass of the &lt;/SPAN&gt;&lt;SPAN&gt;DataSourceStreamReader&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;class JDBCStreamReader(DataSourceStreamReader‌)

   def __init__(self, schema, options‌)

       self.jdbc_url = options.get("jdbc_url")

       self.jdbc_driver_name = options.get("jdbc_driver_name")

       self.jdbc_driver_jar_url = options.get("jdbc_driver_jar_url")

       self.username = options.get("username")

       self.password = options.get("password")

       self.table = options.get("table")

       self.progress_path = options.get("progress_path")

       #self.partitions = options.get("partitions")

       self.token = token

       self.url = f"https://{workspace_url}/api/2.0/fs/files{self.progress_path}progress.json"

       self._load_progress()

       self.start_time = options.get("start_time")

   def initialOffset(self‌)

       """

       Only read records updated after configured start_time

       """

       return {"offset": self.start_time}

    

   def _load_progress(self‌)

       headers = {

           "Authorization": f"Bearer {self.token}",

       }

       response = requests.get(self.url, headers=headers)

       current = response.json().get('current', 0)

       self.current = current

   def _save_progress(self‌)

       url = f"{self.url}?overwrite=true"

       headers = {

           "Authorization": f"Bearer {self.token}",

           "Content-Type": "application/json"

       }

       data = json.dumps({"current": self.current})

       response = requests.put(url, headers=headers, data=data)



   def latestOffset(self) -&amp;gt; dict:



       self.current = datetime.now().strftime("%Y-%m-%d %H:%M:%S")

       return {"offset": self.current}

    

   def partitions(self, start: dict, end: dict‌)

       #returns a sequence of partition objects

       start_offset = datetime.strptime(start["offset"], "%Y-%m-%d %H:%M:%S")

       end_offset = datetime.strptime(end["offset"], "%Y-%m-%d %H:%M:%S")

       # Calculate total duration

       total_duration = end_offset - start_offset



       #Calculate the duration of each partition range

       step = total_duration / 4



       #Generate the 4 ranges using list comprehension

       ranges = [

           (

               (start_offset + step * i).strftime("%Y-%m-%d %H:%M:%S"),

               (start_offset + step * (i + 1)).strftime("%Y-%m-%d %H:%M:%S")

           ) for i in range(4)]

       return [RangePartition(ranges[0][0],ranges[0][1]),RangePartition(ranges[1][0],ranges[1][1]),RangePartition(ranges[2][0],ranges[2][1]),RangePartition(ranges[3][0],ranges[3][1])]

       #return [RangePartition(start_offset, end_offset)]

       #return [RangePartition(start["offset"], end["offset"])]

    

   def commit(self, end‌)

       self._save_progress()

    

   def read(self, partition) -&amp;gt; Iterator[Tuple]:

       import jaydebeapi

       """

       Read records between a time range

       """

       start, end = partition.start, partition.end

       start_timestamp = start

       end_timestamp = end

       #cursor = self.conn.cursor()

       with jaydebeapi.connect(

           jclassname=self.jdbc_driver_name,

           url=self.jdbc_url,

           driver_args=[self.username, self.password],

           jars=self.jdbc_driver_jar_url

       ) as conn:

           with conn.cursor() as cursor:

               cursor.execute(f"SELECT * FROM {self.table} where update_time &amp;gt; '{start_timestamp}' and update_time &amp;lt;= '{end_timestamp}'")

       # Convert rows to pandas dataframe

               rows = cursor.fetchall()

       #columns = [desc[0] for desc in cursor.description]

       #df = pd.DataFrame(rows, columns=columns)

       for r in rows:

         yield (r[0], r[1], r[2], r[3])&lt;/LI-CODE&gt;
&lt;P&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Additional Notes: Key Concepts in the Implementation&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;There is a lot to unpack here. To save time, one of my great colleagues has explained many of the methods used within this approach; see their &lt;/SPAN&gt;&lt;A href="https://community.databricks.com/t5/technical-blog/enhancing-the-new-pyspark-custom-data-sources-streaming-api/ba-p/75538" target="_blank" rel="noopener"&gt;&lt;SPAN&gt;blog&lt;/SPAN&gt;&lt;/A&gt;&lt;SPAN&gt; for reference.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;There are a few important aspects worth highlighting:&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Partitioning with Ranges&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;The &lt;/SPAN&gt;&lt;SPAN&gt;partitions&lt;/SPAN&gt;&lt;SPAN&gt; method is implemented by splitting the time range into four subranges based on a timestamp column. These ranges are then passed to the &lt;/SPAN&gt;&lt;SPAN&gt;RangePartition&lt;/SPAN&gt;&lt;SPAN&gt; class, which divides each partition into chunks that are processed across four Spark tasks. This approach effectively parallelizes the stream, enabling scalable ingestion.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Incremental Processing and Progress Tracking&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Incremental ingestion is achieved by saving the latest offset, specifically, the maximum timestamp seen during processing. After each batch, the &lt;/SPAN&gt;&lt;SPAN&gt;latestOffset()&lt;/SPAN&gt;&lt;SPAN&gt; method is called, which passes this timestamp to &lt;/SPAN&gt;&lt;SPAN&gt;_save_progress()&lt;/SPAN&gt;&lt;SPAN&gt;. This method persists the progress to a Unity Catalog volume (or other persistent storage). When the next batch begins, &lt;/SPAN&gt;&lt;SPAN&gt;_load_progress()&lt;/SPAN&gt;&lt;SPAN&gt; retrieves this saved value to set the starting point, ensuring no data is missed or duplicated.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Using Generators (&lt;/STRONG&gt;&lt;STRONG&gt;yield&lt;/STRONG&gt;&lt;STRONG&gt;) to Prevent Driver Overload&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Lastly, we are using a generator &lt;/SPAN&gt;&lt;SPAN&gt;yield&lt;/SPAN&gt; &lt;SPAN&gt;instead of a return to avoid overloading the Spark driver. This could happen on initial backfill or if you have a large future incremental ingestion load (if you schedule this streaming job).&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H3&gt;&lt;STRONG&gt;Step 5: Register and Wrap Connector In a Declarative Pipeline&lt;/STRONG&gt;&lt;/H3&gt;
&lt;P&gt;&lt;SPAN&gt;Now it’s time for the fun part!&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Here we register the connector for consumption&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;spark.dataSource.register(JDBCStreamDataSource)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Finally, we wrap it in a Declarative Pipeline (formerly known as DLT) and pass the necessary parameters to the options&lt;/SPAN&gt;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;import dlt



@Dlt.table(

name = "sql_server_table",

table_properties={"delta.enableChangeDataFeed" : "true","pipelines.reset.allowed": "false",}

)

def sql_server_table():

 return (spark.readStream.format("sql_server_connector") \

.option("jdbc_url", "jdbc:sqlserver://;serverName=&amp;lt;serverName&amp;gt;;databaseName=&amp;lt;databaseName&amp;gt;;trustServerCertificate=false;encrypt=false") \

.option("jdbc_driver_name", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \

.option("jdbc_driver_jar_url", "/path/to/jar/file.jar") \

.option("username", "admin") \

.option("password", "password") \

.option("table", "dbo.employees") \

.option("progress_path", "&amp;lt;/path/to/checkpoint&amp;gt;") \

.option("start_time","2025-06-17 10:47:43") \

.load())&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H2&gt;&lt;STRONG&gt;Wrap-up&lt;/STRONG&gt;&lt;/H2&gt;
&lt;P&gt;&lt;SPAN&gt;By leveraging the new Spark Data Source API, you can build custom, incremental streaming connectors for your data sources with greater flexibility and control. This approach is especially useful for sources like SQL Server, Oracle, or any system with a reliable timestamp or unique key. Plus, you only need to really build one connector per source system, which is a huge bonus! Finally, wrap this all up inside Databricks Declarative Pipelines for the best performance and developer experience!&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;If you're interested in extending this further or exploring specific use cases, feel free to ask and reach out!&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;STRONG&gt;Happy data-ingesting!&lt;/STRONG&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 23 Sep 2025 13:50:53 GMT</pubDate>
    <dc:creator>Zach_Jacobson23</dc:creator>
    <dc:date>2025-09-23T13:50:53Z</dc:date>
    <item>
      <title>Ingesting Source Systems - Custom Data Source API JDBC Connections</title>
      <link>https://community.databricks.com/t5/technical-blog/ingesting-source-systems-custom-data-source-api-jdbc-connections/ba-p/129653</link>
      <description>&lt;P&gt;&lt;SPAN&gt;Are you ever frustrated trying to bring external data sources into Databricks? With the rapid growth of products, partners, and connectors, it can feel overwhelming to keep up. However, Databricks remains a first-class Data and AI platform capable of handling nearly any data ingestion scenario.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 23 Sep 2025 13:50:53 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/ingesting-source-systems-custom-data-source-api-jdbc-connections/ba-p/129653</guid>
      <dc:creator>Zach_Jacobson23</dc:creator>
      <dc:date>2025-09-23T13:50:53Z</dc:date>
    </item>
  </channel>
</rss>

