I have to run the notebook in concurrently using process pool executor in python

ETLdeveloper
New Contributor II

Hello All,

My scenario required me to create a code that reads tables from the source catalog and writes them to the destination catalog using Spark. Doing one by one is not a good option when there are 300 tables in the catalog. So I am trying the process pool executor, which creates a separate process for each table and runs it concurrently as much as possible. When I created a code, it throws an error like "PythonUtils does not exist in the JVM". Essentially, I made two notebooks: one for the parent and one for the child. I'm going to share my code, could you advise me on how to solve and implement parallelism for my task?

 

#parent notebook code

import datetime as dt
from concurrent.futures import ProcessPoolExecutor

def execute_child_notebook(table_name😞
    try:
        # Get the notebook path
        notebook_path = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"

        # Execute the child notebook using dbutils.notebook.run
        result = dbutils.notebook.run(notebook_path, 1800, {"table_names": table_name})
        return f"Execution result for {table_name}: {result}"

    except Exception as e:
        return f"Error executing notebook for {table_name}: {e}"

if __name__ == "__main__":
    # Define the list of table names to process
    table_names = ["table1", "table2"]

    try:
        # Create a ProcessPoolExecutor with default number of workers
        with ProcessPoolExecutor() as executor:
            # Map the execute_child_notebook function to process each table name in parallel
            results = executor.map(execute_child_notebook, table_names)

            # Iterate over results and print them
            for result in results:
                print(result)

    except Exception as e:
        print(f"Error: {e}")
 
#Child notebook code .....................................................................................................................
 
from pyspark.sql import SparkSession
import datetime as dt
from concurrent.futures import ProcessPoolExecutor

table_name = dbutils.widgets.get("table_name")  # Get the table_name from a widget

def read_write_spark_data(table_name😞
    try:
        spark = SparkSession.builder.appName("MyJob").getOrCreate()

        # Construct the fully qualified source table name
        source_table = f"xxxxxxxxxxxxxxxxxxxxxxxxx.{table_name}"

        # Verify if the source table exists
            # Read data from the source table
        df = spark.read.table(source_table).limit(100)

            # Construct the fully qualified target table name
        target_table = f"yyyyyyyyyyyyyyyyyyyyyyyyy.{table_name}"

            # Write data to the target table
        df.write.saveAsTable(target_table)

        return(f"Successfully read data from '{source_table}' and wrote to '{target_table}'.")

    except Exception as e:
        print(f"Error: {e}")


# Execute the read_write_spark_data function
read_write_spark_data(table_name)