12-10-2022 07:31 AM
We are trying to improve our overall runtime by running queries in parallel using either multiprocessing or threads. What I am seeing though is that when the function that runs this code is run on a separate process it doesnt return a dataFrame with any info.
Sample Code
def GetData(job_context, group_numbers):
sqlquery = """
SELECT *
FROM
recon.v_ods_payment
WHERE group_number in({0})""".format(group_numbers)
payments_df = job_context.create_dataframe_from_staging(sqlquery)
print(payments_df)
#display(payments_df)
return payments_df
def DoWork():
job_list = ["payments.payments_job"]
for job in job_list:
try:
p = multiprocessing.Process(target=GetData, args=(job_Context,"0030161",))
p.start()
processes.append(p)
except Exception as ex:
status = "CL_Failed"
message = f"The job {job} failed due an exception. Please refer to Databricks for more information. Error: {repr(ex)}"
job_context.logger_error(message)
raise ex
for p in processes:
p.join()
The error I can see in the logs is as follows
File "<command-2328844487947527>", line 13, in GetData
print(payments_df)
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 452, in __repr__
return "DataFrame[%s]" % (", ".join("%s: %s" % c for c in self.dtypes))
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 1040, in dtypes
return [(str(f.name), f.dataType.simpleString()) for f in self.schema.fields]
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 257, in schema
self._schema = _parse_datatype_json_string(self._jdf.schema().json())
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/databricks/spark/python/pyspark/sql/utils.py", line 127, in deco
return f(*a, **kw)
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 332, in get_return_value
format(target_id, ".", name, value))
py4j.protocol.Py4JError: An error occurred while calling o1361.schema. Trace:
py4j.Py4JException: Method schema([]) does not exist
This runs totally fine if not running on a separate process
Any ideas or help would be greatly appreciated
12-10-2022 01:30 PM
Spark, by default, works in a distributed way and does the process in parallel using native spark.
It will use all cores; every core will process one partition and write it to the database.
The same with selec.
So for example you can use the below code to read SQL in parallel:
table = (spark.read
.format("jdbc")
.option("url", "<jdbc_url>")
.option("dbtable", "<table_name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition_key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min_value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max_value>")
# number of partitions to distribute the data into. Use sc.defaultParallelism to set to number of cores
.option("numPartitions", sc.defaultParallelism)
.load()
12-10-2022 03:57 PM
How would I achieve this is I wanted to run queries in parallell for different tables?
Like
DF = Select * from table a where group =1
DF2 = Select * from table b where group =1
12-11-2022 02:28 AM
@Peter Vanelli
Check if this helps
https://medium.com/analytics-vidhya/horizontal-parallelism-with-pyspark-d05390aa1df5
12-12-2022 08:51 PM
This might help https://dustinvannoy.com/2022/05/06/parallel-ingest-spark-notebook/
Write a function to pass your table list via array, into a queue for threading
04-08-2024 09:37 PM
Thanks for the links mentioned above. But both of them uses raw python to achieve parallelism. Does this mean Spark (read PySpark) does exactly provisions for parallel execution of functions or even notebooks ?
We used a wrapper notebook with ThreadPoolExecutor to get our job done. This one is purely Python. The wrapper notebook spawning the same child notebook with different parameters. These child notebooks are using Spark Dataframes SQL. And it is running and all I can say is then we are not exactly unhappy with it. It helped even more when scaled our environment horizontally. Four clusters, each loading around 200-250 files to Delta Lake. In totality around 24 GB of Data within 2-3 hours, in incremental fashion.
Guess we have to live with that till Spark comes up with something in this area.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group