โ06-21-2024 07:56 AM
Hi Team ,
I am planning to create IF/ELSE condition task in databricks using terraform code . My requirement is
Task A ( Extract records from DB and Count recs) --> Task B ( validate the counts using Condition_task) --> Task c ( load data if Task B validate the counts >0)
I am able to implement it in databricks manually but trying to implement the same using Terraform code . Could you please help with this coditional task in data bricks . How can i refer "Total_record_counts' set in the task A is refered in Task b using condition task.
Any help with sample code will be helpful
โ06-25-2024 10:36 AM
Hi @RajaPalukuri,
Task A: Extract records from DB and count records
Task B: Validate the counts using a condition task
condition_task
block.dynamic "task" {
for_each = var.map_of_tables
content {
task_key = "${var.env}_validate_total_rec_count_${lower(task.key)}"
run_if = "ALL_SUCCESS"
depends_on = [
"${var.env}_get_total_rec_count_${lower(task.key)}" # Task to get the record counts
]
condition_task {
left = "{{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}"
op = "GREATER_THAN"
right = 0
}
}
}
left
expression, youโre referring to the total record count from the predecessor notebook task (Task A). The syntax {{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}
should work to reference the value set in Task A.Task C: Load data if Task B validates the counts > 0
Additionally, ensure that youโve set up your Terraform provider configuration for Databricks.
If you encounter any issues or need further assistance, feel free to ask! ๐
โ06-27-2024 02:52 PM - edited โ06-27-2024 02:58 PM
Hi Kaniz,
Thank you for replying back to my request. Yes I am consolidating all table record counts using notebook task (Task A) and passing 'Total_rec_counts' to condition_task (Task B) . It validate the 'Total_rec_counts' and decide to run DLT pipeline.
code
dynamic "task" {
for_each = var.map_of_tables
content {
task_key = "${var.env}_validate_total_rec_count_${lower(task.key)}"
run_if = "ALL_SUCCESS"
depends_on{
Task_keY="${var.env}_get_total_rec_count_${lower(task.key)}" # Task to get the record counts
}
condition_task {
left = "{{tasks.${var.env}_get_total_rec_count_${lower(task.key)}.values.total_rec_count}}"
op = "GREATER_THAN"
right = 0
}
}
}
Error :- it is not able to identify left operand
Error: cannot update job: The "left" operand of the if/else condition alid reference. Invalid reference: '{{tasks.env-HLQ_UNIT_TEST_get_total_rec_count.values.total_rec_count}}'. '{{tasks.env-HLQ_UNIT_TEST_get_total_rec_count.values.total_rec_count}}' is unknown.
please suggest how to fix it . My dependency syntax is little different than yours. The syntax you proposed was giving error.
โ06-28-2024 10:50 AM
Hi Kaniz,
To give you more details above request , In task A where we consolidate all tables records counts we are doing it through notebook task through python program. ''Total_rec_counts' , is out put variable which is set in python program . How could we refer it in the validation task in Terraform ?.
There is anyway we can create output variable in TASK A in terraform and refer in TASK B ?. If so how to define the output variable in Terraform in TASK A
โ06-30-2024 06:44 AM
Implementing conditional logic in Databricks using Terraform involves setting up tasks and condition checks between them. Here's how you can structure your Terraform code to achieve the desired workflow:
Assume you have three tasks:
You'll need to define resources for each task and set up dependencies and condition checks between them.
provider "databricks" {
# Configuration for your Databricks provider
}
variable "env" {
description = "Environment identifier"
}
variable "map_of_tables" {
description = "Map of tables to process"
type = map(any)
}
# Task A: Extract records and count
resource "databricks_notebook" "task_a" {
for_each = var.map_of_tables
name = "${var.env}_extract_and_count_${each.key}"
content = <<EOF
// Your notebook content to extract and count records
val df = spark.read.format("jdbc").option("url", "<database_url>").load("<table_name>")
val total_rec_count = df.count()
// Print the count for validation in the next task
println(s"Total records: $total_rec_count")
EOF
}
# Task B: Validate counts using Condition_task
resource "databricks_notebook" "task_b" {
for_each = var.map_of_tables
name = "${var.env}_validate_count_${each.key}"
content = <<EOF
// Your notebook content to validate counts
val total_rec_count = dbutils.notebook.getContext().tags("total_rec_count").toInt
// Check if total_rec_ this website_count is greater than 0
if (total_rec_count > 0) {
println("Count validation successful!")
} else {
println("Count validation failed!")
}
EOF
depends_on = [databricks_notebook.task_a[each.key]]
}
# Task C: Load data if validation succeeds
resource "databricks_notebook" "task_c" {
for_each = var.map_of_tables
name = "${var.env}_load_data_${each.key}"
content = <<EOF
// Your notebook content to load data, if required
val total_rec_count = dbutils.notebook.getContext().tags("total_rec_count").toInt
// Example: Load data if count validation passed
if (total_rec_count > 0) {
// Your logic to load data
println("Loading data...")
} else {
println("No data to load.")
}
EOF
depends_on = [databricks_notebook.task_b[each.key]]
}
Task A (databricks_notebook.task_a): This notebook extracts records from a database and counts them. It sets a tag total_rec_count in the Databricks context, which can be used by subsequent tasks.
Task B (databricks_notebook.task_b): This notebook validates the count obtained from Task A. It retrieves the total_rec_count tag from the context and performs a validation (e.g., checking if the count is greater than 0).
Task C (databricks_notebook.task_c): This notebook loads data if the validation from Task B succeeds (i.e., total_rec_count is greater than 0). It depends on Task B to ensure it runs only after validation.
Tag Usage: Note how dbutils.notebook.getContext().tags("total_rec_count") is used to retrieve the total_rec_count set by Task A. This ensures Task B and Task C can access the count for validation and loading, respectively.
Dependencies: Dependencies (depends_on) are set to ensure tasks run in the correct order. For example, Task B depends on Task A to complete successfully before it validates the count.
This setup allows you to define a structured workflow in Terraform for your Databricks tasks, including conditional logic based on the count of records extracted from the database. Adjust the notebook contents (content blocks) according to your specific logic and environment details.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group