Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
Showing results for 
Search instead for 
Did you mean: 

My whole code is running on driver node, I want my code to run on worker nodes so that the memory of driver node is not exhausted. Please tell me improvement is my codes. My spark crashes frequently when the pulled data from s3 is huge.

New Contributor III

I am running process which has 4 steps.

  1. Querying s3 file paths from dynamo DB based on certain parameters given by user. (function to do so provided by client, just have to import). Returns a list of files
  2. Check if those file paths have already been queried. Get distinct files and append to a files delta table.
  3. Fetch data from s3 file paths that were queried earlier (function to do so provided by client, just have to import, and give file path as a parameter). Returns a list of objects where key is 'timestamp' and value is 'pd.DataFrame'.
  4. I concatenate all the dataframes from all the objects in the list and append it to a dataframe delta table.


def querying_dynamodb(start_date, end_date):
  pitd_objects = []
  wmp_file_meta_data = []
  query_timestamp1 = time.time()
  query_resp = perform_multipart_accel_data_query(env, id,start_date,end_date)
  # Traversing the queried data from dynamoDb and putting wmp_metadata to a list. This will be used in avoiding data overlapping.
  if len(query_resp) != 0:
          for response in query_resp:
              id_new = int(response["id"]['N'])
              wmp_file_path = response['file_path']['S']
              accel_data_file_path = response['accel_data']['S']
              ts = response['timestamp']['N']
    return ([], '')
#     dbutils.notebook.exit("True")
  query_timestamp2 = time.time()
  query_difference = (query_timestamp2 - query_timestamp1)
  return (wmp_file_meta_data)
def get_distinct_wmp_files(wmp_file_meta_data):
  columns = ["uuid", "start_date", "end_date", "id", "wmp_file_path"]
  dataframe = spark.createDataFrame(wmp_file_meta_data, columns)
  table_name = 'wmp_metadata_temp_'+str(id)
  #dataframe is converted to delta table
  new_wmp_files = spark.sql("SELECT * FROM {} WHERE NOT EXISTS (SELECT 1 FROM wmp_metadata_partitioned WHERE {}.id = AND {}.wmp_file_path = wmp_metadata_partitioned.wmp_file_path)".format(table_name,table_name,table_name))  # distinct WMP Files -- avoiding data overlapping
  return new_wmp_files
def convert_to_pitd_wout_collect(wmp_file_meta_data):
    Function to convert the wmp files to PITD
    wmp_files: list
    PITD objects: list
    print("Converting to PITD")
    pitd_objects = []
    pitd_timestamp1 = time.time()
    for files in wmp_file_meta_data:
        # Fetching PITD objects from s3 and saving to a list.
    #     wmp_file_path = files['wmp_file_path']
        ts = int(files.split('/')[1].split('.')[0])
        pitd = get_pitd_for_file_path(
      except botocore.exceptions.ClientError as e:
        if e.response['Error']['Code'] == "404":
            print("The object does not exists.")
    pitd_timestamp2 = time.time()
    pitd_difference = pitd_timestamp2 - pitd_timestamp1
    print("PITD conversion successful!")
    return (pitd_objects)
def process_pitd_objects(time_data):
    section_frames = []
    for i, section in enumerate(time_data):
          td_df =  pd.DataFrame.from_dict(time_data[section])
          td_df.index = td_df.index + int(section)
          td_df[['a1', 'a2', 'a3', 'roll', 'pitch']] = td_df[['a1', 'a2', 'a3', 'roll', 'pitch']].astype('float64')
      except Exception as e:
    if section_frames:
      complete_frame = pd.concat(section_frames)
      complete_frame["index"] = complete_frame.index
      complete_frame["id"] = id
      complete_frame["uuid"] = query_uuid
      return complete_frame
def process_dataframes(pitd_objects, id, uuid):
  if pitd_objects:
    print("Processing PITD objects...")
    pitd_objects_rdd = sc.parallelize(pitd_objects)
    section_frames_rdd =
    print("Processing completed. Concatenating dataframes....")
    # Flatten the RDD of lists into an RDD of DataFrames
    result_df = section_frames_rdd.treeReduce(lambda x, y: pd.concat([x,y]))
    print("Concatenation completed. Dumping to delta table...")
    status = dump_accel_data(spark.createDataFrame(result_df))
    if status:
      print("Dumped succesfully...")
      print("Dumping Failure.")
    return spark.createDataFrame(result_df)
wmp_files_paths = querying_dynamodb(start_date, end_date)
new_wmp_files = get_distinct_wmp_files(wmp_files_paths) # returns a pyspark dataframe
wmp_file_list = x: x.wmp_file_path).collect() # convert pyspark dataframe column (wmp_file_path) to a list 
pitd_objects = convert_to_pitd_wout_collect(wmp_file_list)
process_dataframes(pitd_objects, id, query_uuid)

This whole code is in a notebook, and multiple (read: hundreds) instance of notebooks run in parallel through a threadpool executor in python. My spark crashes when the data is too much. How can I improve the code?

Cluster details: Driver: i3.4xlarge · Workers: c4.4xlarge · 4-8 workers · On-demand and Spot · fall back to On-demand · 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12) · us-east-1a (12-20 DBU)


Not applicable

@uzair mustafa​ : I am giving a framework to think about based on parallelizing the S3 file processing, Spark's caching capabilities and using Delta Lake's time travel capabilities, please see if this helps you to get started

from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Define the schema for the S3 files
schema = StructType([
  StructField("timestamp", StringType()),
  StructField("data", StringType())
# Read the list of S3 file paths from DynamoDB
file_paths = read_file_paths_from_dynamo_db()
# Filter out the file paths that have already been processed
delta_table = DeltaTable.forPath(spark, "delta_table_path")
processed_files = delta_table.toDF().select("file_path").distinct()
file_paths = [f for f in file_paths if f not in processed_files]
# Parallelize the file processing
file_paths_rdd = sc.parallelize(file_paths, len(file_paths))
data_rdd = f: (f, read_data_from_s3(f)))
data_df = data_rdd.toDF(["file_path", "data"]).select(
# Write the data to the Delta table

In this example, we read the list of S3 file paths from DynamoDB and filter out the files that have already been processed by querying a Delta table. Then, we parallelize the file processing using Spark's parallelize method and map function. Finally, we write the results to a Delta table using the write method. Note that the read_data_from_s3 function is assumed to be provided by the client, and should be modified to process the S3 data in parallel.


Hi @uzair mustafa​ 

Thank you for posting your question in our community! We are happy to assist you.

Does @Suteja Kanuri​'s answer help? If it does, would you be happy to mark it as best?

This will help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!