cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

using concurrent.futures for parallelization

ironv
New Contributor

Hi, trying to copy a table with billions of rows from an enterprise data source into my databricks table.  To do this, I need to use a homegrown library which handles auth etc, runs the query and return a dataframe.  I am partitioning the table using ~100queries.  The code runs perfectly when I run sequentially (setting `NUM_PARALLEL=1` below).  However when even set NUM_PARALLEL=2, it runs fine (as in things are running in parallel) but for the other time, I keep getting an exception `SparkSession$ does not exist in the JVM`.  There seem to be no errors thrown either on the enterprise data source side or the other library (for which I can see the logs too).  Thx!

 

 

 

 

def process_row(row):
  st = time.time()
  tmpdir = f"/dbfs{prm['TMP_DIR']}/{row.table_name}/{row.query_id}"
  for trial in range(1,prm["MAX_TRIES"]+1):
    try:
      os.makedirs(f"{tmpdir}/{trial}", exist_ok=True)
      with suppress_stdout_stderr():
        sdf = ud.getDataFrame(query=row.query, save_to_dir=f"{tmpdir}/{trial}").get_spark_df()
      
      if sdf:
        sdf.write.mode("append").saveAsTable(f"{prm['DATABASE']}.{row.table_name}")
        cnt_rec = sdf.count()
      else:
        cnt_rec = 0
      
      qry = f"UPDATE {prm['DATABASE']}._extract SET completed = '{datetime.now()}' WHERE query_id = '{row.query_id}'"
      spark.sql(qry)
      
      print(" {:,} ({:.1f}min) ".format(cnt_rec,(time.time()-st)/60.), end=" ", flush=True)
      break
    except Exception as e:
      print(e)
      print(f" (retrying {trial}) ", end=" ", flush=True)
      time.sleep(trial*random.randint(30,60))
  
  time.sleep(5)
  shutil.rmtree(tmpdir, ignore_errors=True)

 

 

 

 

 code which is calling the above

 

 

 

 

for tbl,rows_iter in itertools.groupby(queries_lst, key=lambda x: x.table_name):
  rows_lst = list(rows_iter)
  print(f"table: {tbl}  partitions: {len(rows_lst):,}", flush=True)
  
  st0 = time.time()
  if prm["NUM_PARALLEL"]>1:
    with ProcessPoolExecutor(max_workers=prm["NUM_PARALLEL"]) as executor:
      job_list = [executor.submit(process_row, row) for row in rows_lst]
      for job in as_completed(job_list):
        pass
  
  else:
    for row in rows_lst:
      process_row(row)
  
  print(f"\n  time: {(time.time()-st0)/60.:.1f}min\n", flush=True)

 

 

 

 

 

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

The "SparkSession$ does not exist in the JVM" error in your scenario is almost always due to the use of multiprocessing (like ProcessPoolExecutor) with Spark. Spark contexts and sessions cannot safely be shared across processes, especially in Databricks or PySpark environments, because the JVM and SparkContext are not fork-safe and cannot be serialized and sent to child processes reliably.

Why This Happens

  • SparkSession is JVM-bound: Each process launched by ProcessPoolExecutor spins up its own Python interpreter, and tries to access the SparkSession that was created in the parent. This won't work; the session cannot be forked/copied into child JVMs.

  • PySpark/Databricks does not support multiprocessing using Python's multiprocessing or ProcessPoolExecutor for Spark jobs. Each process must independently create its own SparkSession, which can cause resource contention, failures, and the SparkSession$ error you're seeing.

  • Sequential operation works because only one Python process and the main Spark JVM are used; no parallelism implies no cross-process sharing issues.

How to Fix

Use thread-based, not process-based parallelism:

  • Replace ProcessPoolExecutor with ThreadPoolExecutor. Spark is not forkable but can tolerate concurrent threads within the same driver process — provided thread safety is handled.

  • Alternatively, use Spark's own parallelism through DataFrame partitioning or mapPartitions.

Example Fix

python
from concurrent.futures import ThreadPoolExecutor, as_completed ... if prm["NUM_PARALLEL"] > 1: with ThreadPoolExecutor(max_workers=prm["NUM_PARALLEL"]) as executor: job_list = [executor.submit(process_row, row) for row in rows_lst] for job in as_completed(job_list): pass else: for row in rows_lst: process_row(row)

This should eliminate the "SparkSession$" error because all threads share the same process, JVM, and Spark context.

Important Caveats:

  • Make sure your homegrown library and all Spark interactions are thread-safe.

  • Limit the level of parallelism (NUM_PARALLEL) to not overwhelm the Spark driver.

  • Consider possible GIL limitations for non-Spark workloads.

Alternatives

  • Leverage Spark native partitioning: Run a single distributed Spark job that pulls data in partitions.

  • Use Databricks workflows: For large-scale orchestrations, Databricks Jobs can run tasks in parallel safely.

References

  • : Learn why SparkSession errors happen with multiprocessing and how to fix them

  • : Official Databricks documentation on SparkSession lifecycle and forks


Switching to ThreadPoolExecutor will let your parallelism work without causing SparkSession errors. For industrial-scale data loads, native Spark parallelism is preferable.