from pyspark.sql.types import StructField, StructType
def config_parser(config_file: str) -> dict:
"""
Parses the YAML configuration file containing details about the Databricks job to be executed,
including source code and runtime parameters.
"""
try:
with open(config_file) as f:
configuration = yaml.safe_load(f)
output = {"job": {}}
for k, v in configuration["job"].items():
output["job"][k] = configuration["job"][k]
return output
except Exception as ex:
raise ValueError(
"Failed to initialize ApplicationConfiguration, couldn't load YAML config!"
) from ex
# def create_tasks(tasks_list_input=None, cond_tasks_list_input=None, job_cond_tasks_list_input=None)-> List:
# email_notifications=jobs.JobEmailNotifications(tasks_dict["email_notifications"]),
def create_tasks(tasks_list_input)-> List:
"""
Dynamically creates task objects based on the provided configurations.
Args:
- tasks_list_input (list): List containing task details.
Returns:
- list: A list of Databricks task objects.
"""
print("Enter create_tasks")
tasks_list_output = []
for tasks_dict in tasks_list_input:
print("Libraries are ",tasks_dict["libraries"])
task = jobs.Task(
description=tasks_dict["description"],
job_cluster_key="default_cluster",
spark_python_task=jobs.SparkPythonTask(
python_file=tasks_dict["python_file"],
source=jobs.Source.WORKSPACE,
parameters=tasks_dict["parameters"],
),
task_key=tasks_dict["task_key"],
timeout_seconds=0,
depends_on=[
jobs.TaskDependency(task_key=i) for i in tasks_dict.get("depends_on", [])
],
)
tasks_list_output.append(task)
return tasks_list_output
yaml_dict = config_parser("config3.yaml")
job = yaml_dict["job"]
tasks = job["tasks"]
job_tasks = create_tasks(tasks)
created_job = w.jobs.create(
name=f"{job['name']}-{time.time_ns()}",
job_clusters=[
jobs.JobCluster(
job_cluster_key="default_cluster",
new_cluster=compute.ClusterSpec(
spark_version=w.clusters.select_spark_version(long_term_support=True),
node_type_id="Standard_DS3_v2",
num_workers=2,
autoscale=AutoScale(min_workers=2, max_workers=6),
data_security_mode=DataSecurityMode.NONE,
),
),
],
tasks=job_tasks,
)