03-10-2023 03:20 AM
I am running process which has 4 steps.
code:
def querying_dynamodb(start_date, end_date):
pitd_objects = []
wmp_file_meta_data = []
query_timestamp1 = time.time()
query_resp = perform_multipart_accel_data_query(env, id,start_date,end_date)
# Traversing the queried data from dynamoDb and putting wmp_metadata to a list. This will be used in avoiding data overlapping.
if len(query_resp) != 0:
for response in query_resp:
id_new = int(response["id"]['N'])
wmp_file_path = response['file_path']['S']
accel_data_file_path = response['accel_data']['S']
ts = response['timestamp']['N']
wmp_file_meta_data.append((query_uuid,start_date,end_date,id_new,wmp_file_path))
else:
return ([], '')
# dbutils.notebook.exit("True")
query_timestamp2 = time.time()
query_difference = (query_timestamp2 - query_timestamp1)
return (wmp_file_meta_data)
def get_distinct_wmp_files(wmp_file_meta_data):
columns = ["uuid", "start_date", "end_date", "id", "wmp_file_path"]
dataframe = spark.createDataFrame(wmp_file_meta_data, columns)
table_name = 'wmp_metadata_temp_'+str(id)
#dataframe is converted to delta table
dataframe.persist(StorageLevel.MEMORY_AND_DISK)
dataframe.createOrReplaceTempView(table_name)
new_wmp_files = spark.sql("SELECT * FROM {} WHERE NOT EXISTS (SELECT 1 FROM wmp_metadata_partitioned WHERE {}.id = wmp_metadata_partitioned.id AND {}.wmp_file_path = wmp_metadata_partitioned.wmp_file_path)".format(table_name,table_name,table_name)) # distinct WMP Files -- avoiding data overlapping
return new_wmp_files
def convert_to_pitd_wout_collect(wmp_file_meta_data):
"""
Function to convert the wmp files to PITD
parameters:
wmp_files: list
output:
PITD objects: list
"""
print("Converting to PITD")
print()
pitd_objects = []
pitd_timestamp1 = time.time()
for files in wmp_file_meta_data:
try:
# Fetching PITD objects from s3 and saving to a list.
# wmp_file_path = files['wmp_file_path']
ts = int(files.split('/')[1].split('.')[0])
pitd = get_pitd_for_file_path(
file_path=files,
data_retriever=s3_dr,
timestamp=ts,
id=id,
)
pitd_objects.append(pitd.to_dict())
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
print("The object does not exists.")
pitd_timestamp2 = time.time()
pitd_difference = pitd_timestamp2 - pitd_timestamp1
print("PITD conversion successful!")
return (pitd_objects)
def process_pitd_objects(time_data):
section_frames = []
for i, section in enumerate(time_data):
try:
td_df = pd.DataFrame.from_dict(time_data[section])
td_df.index = td_df.index + int(section)
td_df[['a1', 'a2', 'a3', 'roll', 'pitch']] = td_df[['a1', 'a2', 'a3', 'roll', 'pitch']].astype('float64')
section_frames.append(td_df)
except Exception as e:
pass
if section_frames:
complete_frame = pd.concat(section_frames)
complete_frame["index"] = complete_frame.index
complete_frame["id"] = id
complete_frame["uuid"] = query_uuid
return complete_frame
def process_dataframes(pitd_objects, id, uuid):
if pitd_objects:
print("Processing PITD objects...")
pitd_objects_rdd = sc.parallelize(pitd_objects)
section_frames_rdd = pitd_objects_rdd.map(process_pitd_objects)
print("Processing completed. Concatenating dataframes....")
# Flatten the RDD of lists into an RDD of DataFrames
result_df = section_frames_rdd.treeReduce(lambda x, y: pd.concat([x,y]))
print("Concatenation completed. Dumping to delta table...")
status = dump_accel_data(spark.createDataFrame(result_df))
if status:
print("Dumped succesfully...")
else:
print("Dumping Failure.")
return spark.createDataFrame(result_df)
wmp_files_paths = querying_dynamodb(start_date, end_date)
new_wmp_files = get_distinct_wmp_files(wmp_files_paths) # returns a pyspark dataframe
wmp_file_list = new_wmp_files.rdd.map(lambda x: x.wmp_file_path).collect() # convert pyspark dataframe column (wmp_file_path) to a list
pitd_objects = convert_to_pitd_wout_collect(wmp_file_list)
process_dataframes(pitd_objects, id, query_uuid)
This whole code is in a notebook, and multiple (read: hundreds) instance of notebooks run in parallel through a threadpool executor in python. My spark crashes when the data is too much. How can I improve the code?
Cluster details: Driver: i3.4xlarge · Workers: c4.4xlarge · 4-8 workers · On-demand and Spot · fall back to On-demand · 11.3 LTS (includes Apache Spark 3.3.0, Scala 2.12) · us-east-1a (12-20 DBU)
03-13-2023 01:16 AM
@uzair mustafa : I am giving a framework to think about based on parallelizing the S3 file processing, Spark's caching capabilities and using Delta Lake's time travel capabilities, please see if this helps you to get started
from pyspark.sql.functions import col
from pyspark.sql.types import StructType, StructField, StringType
# Define the schema for the S3 files
schema = StructType([
StructField("timestamp", StringType()),
StructField("data", StringType())
])
# Read the list of S3 file paths from DynamoDB
file_paths = read_file_paths_from_dynamo_db()
# Filter out the file paths that have already been processed
delta_table = DeltaTable.forPath(spark, "delta_table_path")
processed_files = delta_table.toDF().select("file_path").distinct()
file_paths = [f for f in file_paths if f not in processed_files]
# Parallelize the file processing
file_paths_rdd = sc.parallelize(file_paths, len(file_paths))
data_rdd = file_paths_rdd.map(lambda f: (f, read_data_from_s3(f)))
data_df = data_rdd.toDF(["file_path", "data"]).select(
col("file_path"),
col("data.timestamp").alias("timestamp"),
col("data.data").alias("data")
)
# Write the data to the Delta table
data_df.write.format("delta").mode("append").save("delta_table_path")
In this example, we read the list of S3 file paths from DynamoDB and filter out the files that have already been processed by querying a Delta table. Then, we parallelize the file processing using Spark's parallelize method and map function. Finally, we write the results to a Delta table using the write method. Note that the read_data_from_s3 function is assumed to be provided by the client, and should be modified to process the S3 data in parallel.
03-31-2023 02:38 AM
Hi @uzair mustafa
Thank you for posting your question in our community! We are happy to assist you.
Does @Suteja Kanuri's answer help? If it does, would you be happy to mark it as best?
This will help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group