cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Is it possible to use multiprocessing or threads to submit multiple queries to a database from Databricks in parallel?

vanepet
New Contributor II

We are trying to improve our overall runtime by running queries in parallel using either multiprocessing or threads. What I am seeing though is that when the function that runs this code is run on a separate process it doesnt return a dataFrame with any info.

Sample Code

def GetData(job_context, group_numbers):
    sqlquery = """
                SELECT *
                FROM
                    recon.v_ods_payment
                    WHERE group_number in({0})""".format(group_numbers)
       
    payments_df = job_context.create_dataframe_from_staging(sqlquery)
    print(payments_df)
    #display(payments_df)
    return payments_df
 
 
def DoWork():
job_list = ["payments.payments_job"]
 
for job in job_list:
        try:            
          p = multiprocessing.Process(target=GetData, args=(job_Context,"0030161",))
          p.start()      
          processes.append(p)    
        except Exception as ex:            
            status = "CL_Failed"
            message = f"The job {job} failed due an exception. Please refer to Databricks for more information.  Error: {repr(ex)}"
            job_context.logger_error(message)
            raise ex
       
for p in processes:
  p.join() 

The error I can see in the logs is as follows

  File "<command-2328844487947527>", line 13, in GetData
    print(payments_df)
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 452, in __repr__
    return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 1040, in dtypes
    return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields]
  File "/databricks/spark/python/pyspark/sql/dataframe.py", line 257, in schema
    self._schema = _parse_datatype_json_string(self._jdf.schema().json())
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/databricks/spark/python/pyspark/sql/utils.py", line 127, in deco
    return f(*a, **kw)
  File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value
    format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o1361.schema. Trace:
py4j.Py4JException: Method schema([]) does not exist

This runs totally fine if not running on a separate process

Any ideas or help would be greatly appreciated

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

Spark, by default, works in a distributed way and does the process in parallel using native spark.

It will use all cores; every core will process one partition and write it to the database.

The same with selec.

So for example you can use the below code to read SQL in parallel:

table = (spark.read
  .format("jdbc")
  .option("url", "<jdbc_url>")
  .option("dbtable", "<table_name>")
  .option("user", "<username>")
  .option("password", "<password>")
  # a column that can be used that has a uniformly distributed range of values that can be used for parallelization
  .option("partitionColumn", "<partition_key>")
  # lowest value to pull data for with the partitionColumn
  .option("lowerBound", "<min_value>")
  # max value to pull data for with the partitionColumn
  .option("upperBound", "<max_value>")
  # number of partitions to distribute the data into. Use sc.defaultParallelism to set to number of cores
  .option("numPartitions", sc.defaultParallelism)
  .load()

How would I achieve this is I wanted to run queries in parallell for different tables?

Like

DF = Select * from table a where group =1

DF2 = Select * from table b where group =1

Soma
Valued Contributor

huyd
New Contributor III

This might help https://dustinvannoy.com/2022/05/06/parallel-ingest-spark-notebook/

Write a function to pass your table list via array, into a queue for threading

BapsDBS
New Contributor II

Thanks for the links mentioned above. But both of them uses raw python to achieve parallelism. Does this mean Spark (read PySpark) does exactly provisions for parallel execution of functions or even notebooks ? 

We used a wrapper notebook with ThreadPoolExecutor to get our job done. This one is purely Python.  The wrapper notebook spawning the same child notebook with different parameters. These child notebooks are using Spark Dataframes SQL. And it is running and all I can say is then we are not exactly unhappy with it. It helped even more when scaled our environment horizontally. Four clusters, each loading around 200-250 files to Delta Lake. In totality around 24 GB of Data within 2-3 hours, in incremental fashion.

Guess we have to live with that till Spark comes up with something in this area.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.