cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks -Terraform- (condition_task)

RajaPalukuri
New Contributor II

Hi Team ,

I am planning to create IF/ELSE condition task in databricks using terraform code . My requirement is 

Task A ( Extract records from DB and Count recs) --> Task B ( validate the counts using Condition_task) --> Task c ( load data if Task B validate the counts >0)

I am able to implement it in databricks manually but trying to implement the same using Terraform code . Could you please help with this coditional task in data bricks . How can i refer "Total_record_counts' set in the task A is refered  in Task b using condition task.

 following sample code. 
 
   dynamic "task"{
     for_each = var.map_of_tables # this is map variable of tables 
   content {
task_key = "${var.env}_validate_total_rec_count_${lower(task.key)}"
run_if = "ALL_SUCCESS"
depends_on{
         task_key = "${var.env}_get_total_rec_count_${lower(task.key)}"   -- Task to get the record counts 
}
 
condition_task {         # conditional task 
  left = "{{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}"    # facing issue with this how can i refer variable which was set in predecessor notebook task
        op = "GREATER_THAN"
        right = 0
      }
   

Any help with sample code will be helpful

 

 

4 REPLIES 4

Kaniz_Fatma
Community Manager
Community Manager

Hi @RajaPalukuri,

  1. Task A: Extract records from DB and count records

    • You’ve mentioned that you can implement this manually in Databricks. Great! You’ll need to create a notebook or job that extracts records from your database and calculates the total record count.
  2. Task B: Validate the counts using a condition task

    • In Terraform, you can use the Databricks Terraform provider to define your tasks. Specifically, you’ll want to use the condition_task block.
    • Here’s an example of how you can set up the condition task in your Terraform code:
    dynamic "task" {
      for_each = var.map_of_tables
      content {
        task_key = "${var.env}_validate_total_rec_count_${lower(task.key)}"
        run_if = "ALL_SUCCESS"
        depends_on = [
          "${var.env}_get_total_rec_count_${lower(task.key)}" # Task to get the record counts
        ]
    
        condition_task {
          left = "{{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}"
          op = "GREATER_THAN"
          right = 0
        }
      }
    }
    
    • In the left expression, you’re referring to the total record count from the predecessor notebook task (Task A). The syntax {{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}} should work to reference the value set in Task A.
  3. Task C: Load data if Task B validates the counts > 0

    • You’ll need to define another task (Task C) that loads data based on the validation result from Task B. This task can be a Databricks notebook or any other relevant action.

Additionally, ensure that you’ve set up your Terraform provider configuration for Databricks.

If you encounter any issues or need further assistance, feel free to ask! 😊

 

Hi Kaniz,

Thank you for replying back to my request. Yes I am consolidating all table record counts using notebook task (Task A) and passing 'Total_rec_counts' to condition_task (Task B) . It validate the 'Total_rec_counts' and decide to run DLT pipeline. 

code 

 

 

dynamic "task" {
  for_each = var.map_of_tables
  content {
    task_key = "${var.env}_validate_total_rec_count_${lower(task.key)}"
    run_if = "ALL_SUCCESS"
    depends_on{
      Task_keY="${var.env}_get_total_rec_count_${lower(task.key)}" # Task to get the record counts
    }

    condition_task {
      left = "{{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}"
      op = "GREATER_THAN"
      right = 0
    }
  }
}

 

Error :-  it is not able to identify left operand

Error: cannot update job: The "left" operand of the if/else condition alid reference. Invalid reference: '{{tasks.env-HLQ_UNIT_TEST_get_total_rec_count.values.total_rec_count}}'. '{{tasks.env-HLQ_UNIT_TEST_get_total_rec_count.values.total_rec_count}}' is unknown.

please suggest how to fix it . My dependency syntax is little different than yours. The syntax you proposed was giving error.

 

 

Hi Kaniz,

To give you more details above request , In task A where we consolidate all tables records counts we are doing it through notebook task through python program. ''Total_rec_counts' , is out put variable which is set in python program . How could we refer it in the validation task in Terraform ?. 

There is anyway we can create output variable in TASK A in terraform and refer in TASK B ?. If so how to define the output variable in Terraform in TASK A

 

hendrykarlar
New Contributor II

 

Implementing conditional logic in Databricks using Terraform involves setting up tasks and condition checks between them. Here's how you can structure your Terraform code to achieve the desired workflow:

Step 1: Define Databricks Notebooks as Tasks

Assume you have three tasks:

  • Task A: Extract records from a database and count records.
  • Task B: Validate the count obtained from Task A.
  • Task C: Load data if the count from Task B is greater than 0.

Step 2: Terraform Configuration

You'll need to define resources for each task and set up dependencies and condition checks between them.

Example Terraform Configuration:

provider "databricks" {
# Configuration for your Databricks provider
}

variable "env" {
description = "Environment identifier"
}

variable "map_of_tables" {
description = "Map of tables to process"
type = map(any)
}

# Task A: Extract records and count
resource "databricks_notebook" "task_a" {
for_each = var.map_of_tables

name = "${var.env}_extract_and_count_${each.key}"
content = <<EOF
// Your notebook content to extract and count records
val df = spark.read.format("jdbc").option("url", "<database_url>").load("<table_name>")
val total_rec_count = df.count()

// Print the count for validation in the next task
println(s"Total records: $total_rec_count")
EOF
}

# Task B: Validate counts using Condition_task
resource "databricks_notebook" "task_b" {
for_each = var.map_of_tables

name = "${var.env}_validate_count_${each.key}"
content = <<EOF
// Your notebook content to validate counts
val total_rec_count = dbutils.notebook.getContext().tags("total_rec_count").toInt

// Check if total_rec_ this website_count is greater than 0
if (total_rec_count > 0) {
println("Count validation successful!")
} else {
println("Count validation failed!")
}
EOF

depends_on = [databricks_notebook.task_a[each.key]]
}

# Task C: Load data if validation succeeds
resource "databricks_notebook" "task_c" {
for_each = var.map_of_tables

name = "${var.env}_load_data_${each.key}"
content = <<EOF
// Your notebook content to load data, if required
val total_rec_count = dbutils.notebook.getContext().tags("total_rec_count").toInt

// Example: Load data if count validation passed
if (total_rec_count > 0) {
// Your logic to load data
println("Loading data...")
} else {
println("No data to load.")
}
EOF

depends_on = [databricks_notebook.task_b[each.key]]
}

Explanation:

  • Task A (databricks_notebook.task_a): This notebook extracts records from a database and counts them. It sets a tag total_rec_count in the Databricks context, which can be used by subsequent tasks.

  • Task B (databricks_notebook.task_b): This notebook validates the count obtained from Task A. It retrieves the total_rec_count tag from the context and performs a validation (e.g., checking if the count is greater than 0).

  • Task C (databricks_notebook.task_c): This notebook loads data if the validation from Task B succeeds (i.e., total_rec_count is greater than 0). It depends on Task B to ensure it runs only after validation.

Notes:

  • Tag Usage: Note how dbutils.notebook.getContext().tags("total_rec_count") is used to retrieve the total_rec_count set by Task A. This ensures Task B and Task C can access the count for validation and loading, respectively.

  • Dependencies: Dependencies (depends_on) are set to ensure tasks run in the correct order. For example, Task B depends on Task A to complete successfully before it validates the count.

This setup allows you to define a structured workflow in Terraform for your Databricks tasks, including conditional logic based on the count of records extracted from the database. Adjust the notebook contents (content blocks) according to your specific logic and environment details.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!