cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Incrementally load SQL Server table

erigaud
Honored Contributor

I am accessing an on premise SQL Server table. The table is relatively small (10 000 rows), and I access it using

spark.read.jdbc(url=jdbcUrl, table = query)
Every day there are new records in the on prem table that I would like to append in my bronze table on the lakehouse. However there are no "InsertedOn" column or anything, and there are no obvious keys in the data that I could use to MERGE to my bronze table. So currently I am overwriting all the data every day, which does not seem like a good approach. 
Is there a better way to incrementally load the data from SQL server ? Perhaps something using the Streaming Structure ? 
 
Thank you !
2 REPLIES 2

Anonymous
Not applicable

If there are no explicit "InsertedOn" column or other obvious keys to use for incremental loading, you can still achieve incremental loading from the on-premises SQL Server table to your Bronze table in the lakehouse by using a combination of Spark and some additional logic. While streaming might not be necessary in this case, you can still use Spark's capabilities to manage incremental loading efficiently. Here's a suggested approach:

  1. Load the Bronze table into a DataFrame: Before performing incremental loading, load the existing data from your Bronze table into a DataFrame.

    bronze_df = spark.read.format("delta").load("path_to_bronze_table")

     2. Load the entire source table from SQL Server into another DataFrame:

    source_df = spark.read.jdbc(url=jdbcUrl, table=query)
     3. Identify new records: Use DataFrame operations like exceptAll or anti-join to identify the new records in the source DataFrame compared to the existing data in the Bronze table.
    from pyspark.sql import functions as F

    # Assuming there's a unique identifier column 'id', you can use it for comparison
    new_records_df = source_df.join(bronze_df, "id", "left_anti")
     4. Append new records to the Bronze table: Once you have the DataFrame containing new records, you can append it to the Bronze table.
     
    new_records_df.write.format("delta").mode("append").save("path_to_bronze_table"

erigaud
Honored Contributor

As I said, there is no unique identifier in the table that would allow me to do any sort of Join between my source table and my bronze table.