cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Incrementally load SQL Server table

erigaud
Honored Contributor

I am accessing an on premise SQL Server table. The table is relatively small (10 000 rows), and I access it using

spark.read.jdbc(url=jdbcUrl, table = query)
Every day there are new records in the on prem table that I would like to append in my bronze table on the lakehouse. However there are no "InsertedOn" column or anything, and there are no obvious keys in the data that I could use to MERGE to my bronze table. So currently I am overwriting all the data every day, which does not seem like a good approach. 
Is there a better way to incrementally load the data from SQL server ? Perhaps something using the Streaming Structure ? 
 
Thank you !
2 REPLIES 2

Anonymous
Not applicable

If there are no explicit "InsertedOn" column or other obvious keys to use for incremental loading, you can still achieve incremental loading from the on-premises SQL Server table to your Bronze table in the lakehouse by using a combination of Spark and some additional logic. While streaming might not be necessary in this case, you can still use Spark's capabilities to manage incremental loading efficiently. Here's a suggested approach:

  1. Load the Bronze table into a DataFrame: Before performing incremental loading, load the existing data from your Bronze table into a DataFrame.

    bronze_df = spark.read.format("delta").load("path_to_bronze_table")

     2. Load the entire source table from SQL Server into another DataFrame:

    source_df = spark.read.jdbc(url=jdbcUrl, table=query)
     3. Identify new records: Use DataFrame operations like exceptAll or anti-join to identify the new records in the source DataFrame compared to the existing data in the Bronze table.
    from pyspark.sql import functions as F

    # Assuming there's a unique identifier column 'id', you can use it for comparison
    new_records_df = source_df.join(bronze_df, "id", "left_anti")
     4. Append new records to the Bronze table: Once you have the DataFrame containing new records, you can append it to the Bronze table.
     
    new_records_df.write.format("delta").mode("append").save("path_to_bronze_table"

erigaud
Honored Contributor

As I said, there is no unique identifier in the table that would allow me to do any sort of Join between my source table and my bronze table. 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.