cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Microbatching incremental updates Delta Live Tables

Erik_L
Contributor II

I need to create a workflow that pulls recent data from a database every two minutes, then transforms that data in various ways, and appends the results to a final table. The problem is that some of these changes _might_ update existing rows in the final table and I need to resolve the differences, because only columns with new data should be updated. That is, sometimes data can be delayed for a specific `event_time`. For example, `did_foo_value_exceed_n` should be updated when a foo comes in for an older `event_time`.

Anyway, I attempted to do this in Delta Live Tables. However, you cannot pull from a future table to join and merge changes before applying a CDC. I created a normal PySpark script that runs the merge and applies the merge with DeltaTable, but this cannot be used with a Delta Live Tables pipeline, because Workflows don't allow separate compute (Delta Live Tables compute vs Workflow compute) to access the same tables, so I can't take the result of the Delta Live Tables pipeline.

The biggest issue is that I can't use a triggered workflow because the time to retrieve compute is longer than the time I need to run this pipeline. Is there any way I can keep compute between Workflow runs?

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @Erik_L, To maintain the Delta Live Tables pipeline compute running between Workflow runs, opting for a long-running Databricks Job instead of a triggered Databricks Workflow is a solid approach. This long-running job will keep a persistent Spark context active, allowing you to execute the necessary data transformations and data merging tasks continuously.

Here's a step-by-step guide on setting up a long-running Databricks Job for these operations:

1. Create a Databricks Job: You can initiate a Databricks Job through the Databricks UI or the Databricks CLI. Ensure that you select the job type as "Continuous" to establish a long-running job.

2. Set Up Dependencies: It's crucial to ensure that all dependencies, such as Python libraries or packages, are appropriately incorporated into the Databricks cluster linked with the long-running job.

3. Define a Continuous Query: Inside your Databricks Job, define a continuous query that will handle the required data transformations and merging. Continuous questions are structured streaming queries designed to run continuously, writing the output of each question to a specified destination. Here's a simple example to kickstart your work:

 

from pyspark.sql.functions import *
from pyspark.sql.types import *

# Define the schema of the source data
schema = StructType([ 
    StructField("col1", IntegerType(), True), 
    StructField("col2", StringType(), True)
])

# Read the source data

4. Start the Job: Launch your Databricks Job to trigger the continuous query. This will ensure the Spark context and all the necessary resources are available, allowing the job to run continuously.

By implementing a long-running Databricks Job for your data transformation and merging tasks, you can effectively sustain the Delta Live Tables pipeline compute without interruptions between Workflow runs.

I hope this helps, and if you have further questions or need more guidance, feel free to ask!

jose_gonzalez
Moderator
Moderator

Hi @Erik_L ,

Just a friendly follow-up. Have you had a chance to review my colleague's response to your inquiry? Did it prove helpful, or are you still in need of assistance? Your response would be greatly appreciated.

Manisha_Jena
New Contributor III
New Contributor III

Hi @Erik_L,

As my colleague mentioned, to ensure continuous operation of the Delta Live Tables pipeline compute during Workflow runs, choosing a prolonged Databricks Job over a triggered Databricks Workflow is a reliable strategy. This extended job will maintain an ongoing Spark context, enabling the seamless execution of essential data transformations and merging tasks.

Please let me know if it resolves the issue.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!