4 weeks ago
I have a list of about 80 tables that I need to load from an Oracle database into Databricks via JDBC. I would like to do this in parallel, instead of looping through one table at a time.
I have a function defined to ingest the data:
4 weeks ago
Hi @ADB0513
Yes you can use ThreadPoolExecutor and also databricks planning to create for_each activity that you can use to call same notebook multiple time with different parameters.
please find attached code for your reference -
from concurrent.futures import ThreadPoolExecutor
class NotebookData:
def __init__(self, path, timeout, parameters = None, retry = 0):
self.path = path
self.timeout = timeout
self.parameters = parameters
self.retry = retry
def submit_notebook(notebook):
# print("Running notebook for Table : %s " % (notebook.parameters['tableName']))
try:
if (notebook.parameters):
return dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
else:
return dbutils.notebook.run(notebook.path, notebook.timeout)
except Exception as e:
if notebook.retry < 1:
print("Failed For Notebook: ",notebook.parameters)
raise
# print("Retrying notebook for Table : %s " % (notebook.parameters['tableName']))
notebook.retry = notebook.retry - 1
submit_notebook(notebook)
def parallel_notebooks(notebooks, parallel_thread):
"""
If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
This code limits the number of parallel notebooks.
"""
with ThreadPoolExecutor(max_workers = parallel_thread) as ec:
return [ec.submit(NotebookData.submit_notebook, notebook) for notebook in notebooks]
notebooks = [NotebookData(notebook_path, 3600, {"param1" : f'{param1}'}) for param1 in var_dict.values()]
# #Array of instances of NotebookData Class
parallel_thread = 60
try :
res = parallel_notebooks(notebooks, parallel_thread)
result = [i.result(timeout = 3600) for i in res] # This is a blocking call.
print(result)
except NameError as e :
print(e)
3 weeks ago
Hi @Ajay-Pandey Thank you for this. Here is some code snippets of what I am working on.
table_list = [ { 'table' : ' table1'}, {'table' : 'table2'}, {'table' : 'table3'}]
import concurrent.futures
def run_notebook(table):
dbutils.notebook.run("./Notebook", 0, table)
max_workers = 5
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(run_notebook, table_list)
In my notebook that is being called I have a widget to accept the table value. How can I do this if I want to pass in a tuple to my notebook? So something like this?
table_list = [ { 'table' : ' table1', 'attribute1' : 'value1', 'attribute2' : 'value2'}, {'table' : 'table2', 'attribute1' : 'value8', 'attribute2' : 'value19'}, {'table' : 'table3', 'attribute1' : 'value17', 'attribute2' : 'value475'}]
How can I have the widget accept a tuple or how can I convert the string value of the widget to a tuple?
3 weeks ago
Hi @ADB0513
You can use dictionary instead of tuple and you can pass your value as multiple key and value pair
3 weeks ago
Hi @ADB0513
Now Databricks support for_each that means you don't have to use multi threading you can directly use for_each
for_each activity
4 weeks ago
Hi @ADB0513, Thanks for reaching out! Please review the response and let us know if it answers your question. Your feedback is valuable to us and the community.
If the response resolves your issue, kindly mark it as the accepted solution. This will help close the thread and assist others with similar queries.
We appreciate your participation and are here if you need further assistance!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group