cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Migrating logic from Airflow DAGs to Databricks Workflow

jeremy98
Contributor

Hello community,
I'm planning to migrate some logics of Airflow DAGs on Databricks Workflow. But, I was facing out to some doubts that I have in order to migrate (to find the respective) the logic of my actual code from DAGs to Workflow.

There are two points:

7 REPLIES 7

hari-prasad
Valued Contributor II

Hi @jeremy98 , you can leverage databricks dbutils.notebook.run() context to dynamically created jobs/tasks.

Refer following link for more https://docs.databricks.com/en/notebooks/notebook-workflows.html



Regards,
Hari Prasad

Hi Hari, thanks for your response! So, are you essentially suggesting moving forward by using notebooks to build these new workflows? In Airflow DAGs, we typically have a main DAG where we define tasks, and within each task, there are functionalities that call methods from Python files in our "custom libraries." Are you suggesting moving all these custom methods from the libraries I’ve defined into simple notebooks?

I want to ask suggestions also to our Databricks Employees like @Alberto_Umana@Walter_C. Thanks for your help guys.

Walter_C
Databricks Employee
Databricks Employee

 

Dynamic Task Mapping

Databricks Workflows offers a similar concept to Airflow's dynamic task mapping through the "For each" task type


This allows you to run a task in a loop, passing different parameters to each iteration. Here's how you can replicate the functionality of Airflow's 
.expand() function:
  1. Create a "For each" task in your Databricks Workflow.
  2. Define the iterable items (similar to what you'd pass to .expand() in Airflow).
  3. Specify a nested task that will be executed for each item in the iterable.

For example, if you have a list of dates to process, you could set up a "For each" task that iterates over these dates and runs a notebook or Python wheel for each one.

Reference: https://docs.databricks.com/en/jobs/for-each.html 

In Databricks Workflows, there isn't a direct equivalent to Airflow's get_current_context() function. However, you can access similar information through different means:

  1. Job Parameters: You can define job-level parameters that are accessible to all tasks within the workflow.

  2. Task Values: Databricks Workflows supports "Task Values," which allow you to set and retrieve small values from tasks. This can be used to pass information between tasks in a workflow.

  3. Dynamic Values: Databricks Workflows supports dynamic value references, which allow you to access certain runtime information. For example:
    • {{job.run_id}} gives you the current job run ID
    • {{job.start_time}} provides the job start time

  4. Notebook Parameters: If you're using notebook tasks, you can pass parameters to the notebook, which can include runtime information.

    Reference: https://docs.databricks.com/en/jobs/job-parameters.html and https://docs.databricks.com/en/jobs/task-parameters.html 

Hello,

Thank you for your amazing response! You provided all the information I needed at this moment, and I truly appreciate it.

However, I have a doubt regarding the following example code:

(Old) file.py - Based on Airflow DAGs:

 

 

with (...):  
    @task  
    def task_1():  
        ...  
        process_email()  
        ...  

    @task  
    def task_2():  
        ...  

 

 

(New) file.py - Databricks Workflow:
Here, all task definitions are consolidated into a single notebook file, with callable functions that execute the required tasks using the base_parameters passed for example calling function: "task_1", in the other type of task, function: "task_2".

But I think I made a mistake in the settings. Instead of properly splitting the old file.py into individual tasks (where each task corresponds to a separate notebook file), we replicated the entire structure for each task in one notebook.

Now, regarding the process_email method inside task_1:

  • Should I create a separate task for the process_email functionality?
  • Currently, I placed the process_email function inside the src folder and imported it using a wheel package. Is this the correct approach?

Additionally, could you clarify which parts of the code are most suitable for inclusion in a wheel package file?

Thank you again for your help!

 

 

Walter_C
Databricks Employee
Databricks Employee

Yes, it is a good practice to create a separate task for the process_email functionality. This will help in modularizing your code and make it easier to manage and debug. Each task should ideally perform a single responsibility, and separating process_email into its own task aligns with this principle.

The parts of the code that are better to be included in a wheel package are:

  • Utility functions like process_email that are used across multiple tasks or notebooks.
  • Commonly used helper functions or classes that provide shared functionality.
  • Configuration settings or constants that need to be accessed by multiple components of your workflow.



Hello, thanks for your explanation, really helpful! Instead, for testing unique tasks etc. Building Unit Test or Integration tests, how to use databricks yml to set this?

 

Walter_C
Databricks Employee
Databricks Employee

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group