cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Lakeflow partial data ingestion for first load

NageshPatil
New Contributor III

Hi Team,

I am doing ingestion of 10 tables from Azure SQL through Lakeflow connect. I have created gateway and ingestion pipelines using databricks SDK. I am starting ingestion pipeline only when gateway is in Running status with resources. 
I observed that in first load only few tables are ingesting and partial load for one big table (30 ml rows). when I checked the gateway event log, snapshot was started only when ingestion pipeline triggered but ingestion is not waiting for snapshot to gets completed and load the available data from cdc_stage table to delta tables.
how can I overcome this issue of partial load in first run and loads all the historical data available in source sql database? any configuration while creating gateway or any workaround?
P.S. : I also added 15 mins sleep time before I trigger ingestion pipeline to test but snapshot started only when ingestion started.

#lakeflow #ingestion #lakeflowConnect

Nagesh Patil
4 REPLIES 4

pradeep_singh
Contributor III

You can probably add a step that monitor the gatewayโ€™s event log( using sdk) for each snapshot flow and wait until every table snapshot has completed .

Thank You
Pradeep Singh - https://www.linkedin.com/in/dbxdev

SteveOstrowski
Databricks Employee
Databricks Employee

Hi @nagesh_patil,

This is expected behavior based on how the Lakeflow Connect database connector architecture works -- the ingestion gateway and ingestion pipeline are intentionally decoupled components that operate independently. Here is what is happening and how to address it.


WHY THIS HAPPENS

The Lakeflow Connect database connector has a two-component architecture:

1. Ingestion Gateway -- runs on classic compute as a continuous task. It extracts snapshots, change logs, and metadata from the source database and writes them to a staging volume (Unity Catalog volume).
2. Ingestion Pipeline -- runs on serverless compute on a schedule. It moves data from staging storage into destination streaming tables.

The key point is that the ingestion pipeline consumes whatever data is currently available in staging at the time it runs. It does NOT wait for the gateway snapshot to complete. So if you trigger the ingestion pipeline while the gateway is still performing the initial snapshot of your 10 tables, you will get partial results -- only the tables (or portions of tables) that have been fully snapshotted to staging so far.

The gateway status showing "Running" only means the gateway process has started -- it does NOT mean the initial snapshot for all tables is complete. A 15-minute sleep is likely not enough time for 10 tables (especially one with 30 million rows) to be fully snapshotted.

Docs: https://learn.microsoft.com/en-us/azure/databricks/ingestion/lakeflow-connect/


SOLUTION: MONITOR GATEWAY EVENT LOGS FOR SNAPSHOT COMPLETION

The recommended approach is to query the gateway event log to determine when the snapshot phase has completed for all your tables before triggering the ingestion pipeline. The gateway emits flow_progress events at regular intervals (default: every 5 minutes) for each table.

Here is a SQL query to check snapshot status per table:

SELECT
timestamp,
CONCAT(origin.catalog_name, '.', origin.schema_name, '.',
origin.dataset_name) AS table_name,
details:flow_progress.metrics.num_upserted_rows AS rows_upserted,
details:flow_progress.metrics.num_deleted_rows AS rows_deleted,
CASE
WHEN LOWER(origin.flow_name) LIKE '%cdc%' THEN 'cdc'
WHEN LOWER(origin.flow_name) LIKE '%snapshot%' THEN 'snapshot'
ELSE 'unknown'
END AS ingestion_phase
FROM event_log('<your-gateway-pipeline-id>')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
ORDER BY timestamp DESC

How to detect snapshot completion: The gateway uses two flow types per table:
- {catalog}.{schema}.{table}_snapshot_flow -- the initial full snapshot
- {catalog}.{schema}.{table}_cdc_flow -- ongoing incremental CDC

When a table's snapshot is complete, the gateway transitions from _snapshot_flow to _cdc_flow. So you can confirm all snapshots are done when every table shows _cdc_flow events.

Docs: https://docs.databricks.com/en/ingestion/lakeflow-connect/gateway-event-logs.html


RECOMMENDED WORKFLOW IN YOUR SDK CODE

Update your SDK automation to follow this pattern:

1. Deploy and start the gateway -- wait for status = "Running" (as you already do)
2. Poll the gateway event log -- query the event log table periodically (e.g., every 2-5 minutes) to check the ingestion_phase for each of your 10 tables
3. Wait until all tables show CDC phase -- this means the initial snapshot has completed for every table
4. Then trigger the ingestion pipeline -- at this point, all historical data will be available in staging

Here is a conceptual Python example:

import time
from databricks.sdk import WorkspaceClient

w = WorkspaceClient()
gateway_pipeline_id = "<your-gateway-pipeline-id>"

def all_snapshots_complete():
result = w.statement_execution.execute_statement(
warehouse_id="<your-warehouse-id>",
statement=f"""
SELECT
CONCAT(origin.catalog_name, '.', origin.schema_name, '.',
origin.dataset_name) AS table_name,
MAX(CASE WHEN LOWER(origin.flow_name) LIKE '%cdc%'
THEN timestamp END) AS latest_cdc
FROM event_log('{gateway_pipeline_id}')
WHERE event_type = 'flow_progress'
AND level = 'METRICS'
AND origin.pipeline_type = 'INGESTION_GATEWAY'
GROUP BY 1
"""
)
rows = result.result.data_array
if not rows or len(rows) < 10: # expecting 10 tables
return False
return all(row[1] is not None for row in rows) # all have CDC events

# Poll until snapshots complete
while not all_snapshots_complete():
print("Waiting for gateway snapshots to complete...")
time.sleep(300) # Check every 5 minutes

print("All snapshots complete -- triggering ingestion pipeline")


ADDITIONAL TIPS

- You do not need a full refresh after a partial first load. Simply let the gateway finish its snapshot, then re-run the ingestion pipeline. The pipeline will pick up all data staged since the last run.

- For the large 30M row table specifically, the snapshot may take a considerable amount of time. Monitor the rows_upserted metric in the event log to track progress.

- After the first full load completes, subsequent ingestion pipeline runs will only process incremental changes via CDC, so this wait-for-snapshot logic is primarily needed for the initial deployment.

- For Azure SQL specifically, Databricks recommends using change tracking over CDC when your tables have primary keys, as it is lighter weight on the source database.
Docs: https://learn.microsoft.com/en-us/azure/databricks/ingestion/lakeflow-connect/sql-server-source-setu...


DOCUMENTATION REFERENCES

- Managed connectors in Lakeflow Connect: https://learn.microsoft.com/en-us/azure/databricks/ingestion/lakeflow-connect/
- Monitor ingestion gateway with event logs: https://docs.databricks.com/en/ingestion/lakeflow-connect/gateway-event-logs.html
- Configure SQL Server for ingestion: https://learn.microsoft.com/en-us/azure/databricks/ingestion/lakeflow-connect/sql-server-source-setu...

Hope this helps! Let me know if you have questions about implementing the event log polling in your SDK workflow.

* This reply used an agent system I built to research and draft this response based on the wide set of documentation I have available and previous memory. I personally review the draft for any obvious issues and for monitoring system reliability and update it when I detect any drift, but there is still a small chance that something is inaccurate, especially if you are experimenting with brand new features.

NageshPatil
New Contributor III

@SteveOstrowski  I am testing it now. I'll update back once its working as expected and along with if I make any additional code changes.

Nagesh Patil

emma_s
Databricks Employee
Databricks Employee

Hi, the recommended approach on this is to just run the pipeline multiple times on the initial load until all the data is captured. You can also monitor the snapshot completed events in the gateway completed log before triggering the ingestion, but this feels like more work than just running it until it's complete, There is a private preview that could help with this. You'd need to speak to your account team about this.