using concurrent.futures for parallelization
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-19-2025 04:19 PM
Hi, trying to copy a table with billions of rows from an enterprise data source into my databricks table. To do this, I need to use a homegrown library which handles auth etc, runs the query and return a dataframe. I am partitioning the table using ~100queries. The code runs perfectly when I run sequentially (setting `NUM_PARALLEL=1` below). However when even set NUM_PARALLEL=2, it runs fine (as in things are running in parallel) but for the other time, I keep getting an exception `SparkSession$ does not exist in the JVM`. There seem to be no errors thrown either on the enterprise data source side or the other library (for which I can see the logs too). Thx!
def process_row(row):
st = time.time()
tmpdir = f"/dbfs{prm['TMP_DIR']}/{row.table_name}/{row.query_id}"
for trial in range(1,prm["MAX_TRIES"]+1):
try:
os.makedirs(f"{tmpdir}/{trial}", exist_ok=True)
with suppress_stdout_stderr():
sdf = ud.getDataFrame(query=row.query, save_to_dir=f"{tmpdir}/{trial}").get_spark_df()
if sdf:
sdf.write.mode("append").saveAsTable(f"{prm['DATABASE']}.{row.table_name}")
cnt_rec = sdf.count()
else:
cnt_rec = 0
qry = f"UPDATE {prm['DATABASE']}._extract SET completed = '{datetime.now()}' WHERE query_id = '{row.query_id}'"
spark.sql(qry)
print(" {:,} ({:.1f}min) ".format(cnt_rec,(time.time()-st)/60.), end=" ", flush=True)
break
except Exception as e:
print(e)
print(f" (retrying {trial}) ", end=" ", flush=True)
time.sleep(trial*random.randint(30,60))
time.sleep(5)
shutil.rmtree(tmpdir, ignore_errors=True)
code which is calling the above
for tbl,rows_iter in itertools.groupby(queries_lst, key=lambda x: x.table_name):
rows_lst = list(rows_iter)
print(f"table: {tbl} partitions: {len(rows_lst):,}", flush=True)
st0 = time.time()
if prm["NUM_PARALLEL"]>1:
with ProcessPoolExecutor(max_workers=prm["NUM_PARALLEL"]) as executor:
job_list = [executor.submit(process_row, row) for row in rows_lst]
for job in as_completed(job_list):
pass
else:
for row in rows_lst:
process_row(row)
print(f"\n time: {(time.time()-st0)/60.:.1f}min\n", flush=True)
- Labels:
-
Spark

