cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

using concurrent.futures for parallelization

ironv
New Contributor

Hi, trying to copy a table with billions of rows from an enterprise data source into my databricks table.  To do this, I need to use a homegrown library which handles auth etc, runs the query and return a dataframe.  I am partitioning the table using ~100queries.  The code runs perfectly when I run sequentially (setting `NUM_PARALLEL=1` below).  However when even set NUM_PARALLEL=2, it runs fine (as in things are running in parallel) but for the other time, I keep getting an exception `SparkSession$ does not exist in the JVM`.  There seem to be no errors thrown either on the enterprise data source side or the other library (for which I can see the logs too).  Thx!

 

 

 

 

def process_row(row):
  st = time.time()
  tmpdir = f"/dbfs{prm['TMP_DIR']}/{row.table_name}/{row.query_id}"
  for trial in range(1,prm["MAX_TRIES"]+1):
    try:
      os.makedirs(f"{tmpdir}/{trial}", exist_ok=True)
      with suppress_stdout_stderr():
        sdf = ud.getDataFrame(query=row.query, save_to_dir=f"{tmpdir}/{trial}").get_spark_df()
      
      if sdf:
        sdf.write.mode("append").saveAsTable(f"{prm['DATABASE']}.{row.table_name}")
        cnt_rec = sdf.count()
      else:
        cnt_rec = 0
      
      qry = f"UPDATE {prm['DATABASE']}._extract SET completed = '{datetime.now()}' WHERE query_id = '{row.query_id}'"
      spark.sql(qry)
      
      print(" {:,} ({:.1f}min) ".format(cnt_rec,(time.time()-st)/60.), end=" ", flush=True)
      break
    except Exception as e:
      print(e)
      print(f" (retrying {trial}) ", end=" ", flush=True)
      time.sleep(trial*random.randint(30,60))
  
  time.sleep(5)
  shutil.rmtree(tmpdir, ignore_errors=True)

 

 

 

 

 code which is calling the above

 

 

 

 

for tbl,rows_iter in itertools.groupby(queries_lst, key=lambda x: x.table_name):
  rows_lst = list(rows_iter)
  print(f"table: {tbl}  partitions: {len(rows_lst):,}", flush=True)
  
  st0 = time.time()
  if prm["NUM_PARALLEL"]>1:
    with ProcessPoolExecutor(max_workers=prm["NUM_PARALLEL"]) as executor:
      job_list = [executor.submit(process_row, row) for row in rows_lst]
      for job in as_completed(job_list):
        pass
  
  else:
    for row in rows_lst:
      process_row(row)
  
  print(f"\n  time: {(time.time()-st0)/60.:.1f}min\n", flush=True)

 

 

 

 

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group