cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Load tables from JDBC in parallel

ADB0513
New Contributor III

I have a list of about 80 tables that I need to load from an Oracle database into Databricks via JDBC.  I would like to do this in parallel, instead of looping through one table at a time.

I have a function defined to ingest the data:

 

 
def ingest_data (database, table, primary_key)
 
There is some logic in my function, but the main part of it is loading the dataframe from JDBC and writing to a table in Databricks:
 
df = spark.read \
    .format("jdbc") \
    .option("url", url) \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "oracle.jdbc.driver.OracleDriver") \
    .option("partitionColumn", primary_key) \
    .option("lowerBound", int(lowerBound)) \
    .option("upperBound", int(upperBound)) \
    .option("numPartitions", 8) \
    .load()
df.write.mode("overwrite").saveAsTable(table_name)
 
What is the best way to call this function multiple times in parallel?  I have looked at ThreadPoolExecutor, but have never used it.  Also, I'm not sure how I would call it given that my function has more than 1 parameter.
 
Any help is appreciated.
5 REPLIES 5

Ajay-Pandey
Esteemed Contributor III

Hi @ADB0513 
Yes you can use ThreadPoolExecutor and also databricks planning to create for_each activity that you can use to call same notebook multiple time with different parameters.

please find attached code for your reference -

from concurrent.futures import ThreadPoolExecutor

class NotebookData:

  def __init__(self, path, timeout, parameters = None, retry = 0):

    self.path = path
    self.timeout = timeout
    self.parameters = parameters
    self.retry = retry

  def submit_notebook(notebook):
    # print("Running notebook for Table : %s " % (notebook.parameters['tableName']))
    try:
      if (notebook.parameters):
        return dbutils.notebook.run(notebook.path, notebook.timeout, notebook.parameters)
      else:
        return dbutils.notebook.run(notebook.path, notebook.timeout)
    except Exception as e:
       if notebook.retry < 1:
        print("Failed For Notebook: ",notebook.parameters)
        raise
      
    # print("Retrying notebook for Table : %s " % (notebook.parameters['tableName']))
    notebook.retry = notebook.retry - 1
    submit_notebook(notebook)

def parallel_notebooks(notebooks, parallel_thread):
    """
        If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once.
        This code limits the number of parallel notebooks.
    """
    with ThreadPoolExecutor(max_workers = parallel_thread) as ec:
        return [ec.submit(NotebookData.submit_notebook, notebook) for notebook in notebooks]




notebooks = [NotebookData(notebook_path, 3600, {"param1" : f'{param1}'}) for param1 in var_dict.values()]   

# #Array of instances of NotebookData Class
parallel_thread = 60

try : 
    res = parallel_notebooks(notebooks, parallel_thread)
    result = [i.result(timeout = 3600) for i in res] # This is a blocking call.
    print(result)  
except NameError as e :
    print(e)
Ajay Kumar Pandey

ADB0513
New Contributor III

Hi @Ajay-Pandey   Thank you for this.  Here is some code snippets of what I am working on.

table_list = [ { 'table' : ' table1'}, {'table' : 'table2'}, {'table' : 'table3'}]
import concurrent.futures

def run_notebook(table):
    dbutils.notebook.run("./Notebook", 0, table)

max_workers = 5

with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    executor.map(run_notebook, table_list)

 In my notebook that is being called I have a widget to accept the table value.  How can I do this if I want to pass in a tuple to my notebook?  So something like this?

table_list = [ { 'table' : ' table1', 'attribute1' : 'value1', 'attribute2' : 'value2'}, {'table' : 'table2', 'attribute1' : 'value8', 'attribute2' : 'value19'}, {'table' : 'table3', 'attribute1' : 'value17', 'attribute2' : 'value475'}]

  How can I have the widget accept a tuple or how can I convert the string value of the widget to a tuple? 

Ajay-Pandey
Esteemed Contributor III

Hi @ADB0513 

You can use dictionary instead of tuple and you can pass your value as multiple key and value pair

Ajay Kumar Pandey

Ajay-Pandey
Esteemed Contributor III

Hi @ADB0513 

Now Databricks support for_each that means you don't have to use multi threading you can directly use for_each 
for_each activity

Ajay Kumar Pandey

Retired_mod
Esteemed Contributor III

Hi @ADB0513, Thanks for reaching out! Please review the response and let us know if it answers your question. Your feedback is valuable to us and the community.

If the response resolves your issue, kindly mark it as the accepted solution. This will help close the thread and assist others with similar queries.

We appreciate your participation and are here if you need further assistance!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group