I'm using the following custom python function i found online as a stopgap. Your notebook needs have parameters so you can pass in different runtime values for each iteration. The tasklist is just a list of parameters for each iteration
# code downloaded from internet
# enables running notebooks in parallel
from concurrent.futures import ThreadPoolExecutor
class NotebookData:
def __init__(self, path, timeout, parameters=None, retry=0):
self.path = path
self.timeout = timeout
self.parameters = parameters
self.retry = retry
def submitNotebook(notebook):
print("Running notebook {} with params: {}".format(notebook.path, notebook.parameters))
try:
if (notebook.parameters):
return dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else:
return dbutils.notebook.run(notebook.path, notebook.timeout)
except Exception:
print("FAILED: notebook {} with params: {}".format(notebook.path, notebook.parameters))
if notebook.retry < 1:
raise
print("Retrying notebook {} with params: {}".format(notebook.path, notebook.parameters))
notebook.retry = notebook.retry - 1
submitNotebook(notebook)
def parallelNotebooks(notebooks, numInParallel):
# If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
# This code limits the number of parallel notebooks.
with ThreadPoolExecutor(max_workers=numInParallel) as ec:
return [ec.submit(submitNotebook, notebook) for notebook in notebooks]
#get list of data to process from the config "db"
taskList = spark.sql("select * FROM Config.DeltaJobsToRun").toPandas().to_dict('records')
# create a queue of notebooks based on the metadata above. Pass into the generic param notebook and execute
# the dbutils.notebook.run() command that's used doesnt bubble up errors properly so finding the cause means clicking thru all the 'notebook job' links right now. TODO: investigate logging wrapper which will put the info into a good location
notebooks=[]
for n in taskList:
notebooks.append(NotebookData("./deltaLoadParam", 0, n, 1))
res = parallelNotebooks(notebooks, 2)
result = [f.result(timeout=3600) for f in res] # This is a blocking call.
print(result)
good luck with finding the specific iteration of a run which failed with the above approach though - the dbutils.notebook.run function does not return any valuable info nor bubble up exceptions
I plan on removing this functionality from databricks entirely and orchestrate it somewhere else
To do that create a workflow job for that notebook
then use an external orchestration tool/script which uses the databricks jobs api to kick off the jobs in parallel with the correct parameters