Hello All,
My scenario required me to create a code that reads tables from the source catalog and writes them to the destination catalog using Spark. Doing one by one is not a good option when there are 300 tables in the catalog. So I am trying the process pool executor, which creates a separate process for each table and runs it concurrently as much as possible. When I created a code, it throws an error like "PythonUtils does not exist in the JVM". Essentially, I made two notebooks: one for the parent and one for the child. I'm going to share my code, could you advise me on how to solve and implement parallelism for my task?
#parent notebook code
import datetime as dt
from concurrent.futures import ProcessPoolExecutor
def execute_child_notebook(table_name😞
try:
# Get the notebook path
notebook_path = "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
# Execute the child notebook using dbutils.notebook.run
result = dbutils.notebook.run(notebook_path, 1800, {"table_names": table_name})
return f"Execution result for {table_name}: {result}"
except Exception as e:
return f"Error executing notebook for {table_name}: {e}"
if __name__ == "__main__":
# Define the list of table names to process
table_names = ["table1", "table2"]
try:
# Create a ProcessPoolExecutor with default number of workers
with ProcessPoolExecutor() as executor:
# Map the execute_child_notebook function to process each table name in parallel
results = executor.map(execute_child_notebook, table_names)
# Iterate over results and print them
for result in results:
print(result)
except Exception as e:
print(f"Error: {e}")
#Child notebook code .....................................................................................................................
from pyspark.sql import SparkSession
import datetime as dt
from concurrent.futures import ProcessPoolExecutor
table_name = dbutils.widgets.get("table_name") # Get the table_name from a widget
def read_write_spark_data(table_name😞
try:
spark = SparkSession.builder.appName("MyJob").getOrCreate()
# Construct the fully qualified source table name
source_table = f"xxxxxxxxxxxxxxxxxxxxxxxxx.{table_name}"
# Verify if the source table exists
# Read data from the source table
df = spark.read.table(source_table).limit(100)
# Construct the fully qualified target table name
target_table = f"yyyyyyyyyyyyyyyyyyyyyyyyy.{table_name}"
# Write data to the target table
df.write.saveAsTable(target_table)
return(f"Successfully read data from '{source_table}' and wrote to '{target_table}'.")
except Exception as e:
print(f"Error: {e}")
# Execute the read_write_spark_data function
read_write_spark_data(table_name)