09-23-2021 08:38 AM
I have pyspark code running in my local mac, which has 6 cores and 16 GB. I run it in pycharm to do first test.
spark = (
SparkSession.builder.appName("loc")
.master("local[2]")
.config("spark.driver.bindAddress","localhost")
.config("spark.driver.memory", "12g")
.config("spark.sql.broadcastTimeout", "1500")
.config("spark.scheduler.listenerbus.eventqueue.capacity", "20000")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
)
from pyspark.sql import functions as F
class CDATransformation:
def __init__(
self
):
pass
def preprocess_cda(self, cda_subset):
print("***CDA PREPROCSSING***")
return self._preprocess_cda(cda_subset)
def _preprocess_cda(self, cda_subset): # pragma: no cover
df = cda_subset.withColumn(
"EffectiveTime",
F.when(
cda_subset["EffectiveTime"].endswith("Z"), cda_subset["EffectiveTime"]
).otherwise(F.concat(cda_subset["EffectiveTime"], F.lit("Z"))),
)
def process_orderh_step_1(self, cda_subset):
schema = ArrayType(
StructType(
[
StructField(
"Codes", ArrayType(MapType(StringType(), StringType()))
),
StructField(
"ComponentResults",
ArrayType(
StructType(
[
StructField(
"Codes",
ArrayType(MapType(StringType(), StringType())),
),
StructField("ComponentName", StringType()),
StructField("EffectiveDateTime", StringType()),
StructField("ResultValue", StringType()),
StructField("ResultUnit", StringType()),
StructField("ReferenceRange", StringType()),
StructField("Status", StringType()),
]
)
),
),
StructField("EffectiveDateTime", StringType()),
StructField("ReferenceId", StringType()),
StructField("Narrative", StringType()),
StructField("Impression", StringType()),
]
)
)
df1 = (
cda_subset.withColumn(
"first_array",
F.from_json(
F.col("OrderHistory"), schema=schema),
).drop("OrderHistory")
.withColumn(
"lab_array", F.explode_outer(F.col("first_array"))
).drop("first_array")
.withColumn(
"main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
).drop("EID", "EffectiveTime")
)
return df1
def process_orderh_step_2(self, cda_subset):
df1 = cda_subset.withColumn(
"lab_array", F.explode_outer(F.col("first_array"))
).drop("first_array")
df2 = df1.withColumn(
"main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
).drop("EID", "EffectiveTime")
return df2
def process_orderh_step_3(self, cda_subset):
df1 = (
cda_subset.withColumn(
"comp_results_item", F.explode_outer(F.col("lab_array.ComponentResults"))
).drop("lab_array")
.select(
"*",
F.col("comp_results_item.ComponentName"),
F.col("comp_results_item.EffectiveDateTime"),
F.col("comp_results_item.ResultValue"),
F.col("comp_results_item.ReferenceRange"),
).drop("comp_results_item")
.withColumn(
"EffectiveDateTime",
F.when(
F.col("EffectiveDateTime").endswith("Z"), F.col("EffectiveDateTime")
).otherwise(F.concat(F.col("EffectiveDateTime"), F.lit("Z"))),
)
)
df5 = df1.withColumn(
"EffectiveDateTime",
F.to_timestamp("EffectiveDateTime", "yyyy-MM-dd'T'HH:mm:ss'Z'"),
).withColumn(
"Period_Start_Date", F.split(F.col("Period_Start_Date"), r"\+")[0]
)
df6 = df5.withColumn(
"Start_Date", F.to_date("Period_Start_Date")
).withColumn(
"later",
F.when(F.col("EffectiveDateTime") > F.col("Start_Date"), 1).otherwise(0),
)
# df7 = df6.withColumn(
# "later",
# F.when(
# F.col("EffectiveDateTime") > F.col("Period_Start_Date"), 1
# ).otherwise(0),
# )
# log.info("step 3 df7")
# df7.printSchema()
# df7.show(truncate=False)
return df6
def process_orderh_step_4(self, cda_subset):
df1 = cda_subset.filter(cda_subset["later"] == 1)
# log.info("step 4 df1")
# df1.printSchema()
# df1.show(truncate=False)
pattern_number_dot = r"([0-9.]+)"
df2 = df1.withColumn(
"lower_bound", F.split(F.col("ReferenceRange"), "-").getItem(0)
).withColumn(
"upper_bound_1", F.split(F.col("ReferenceRange"), "-").getItem(1)
).withColumn(
"upper_bound",
F.regexp_extract(F.col("upper_bound_1"), pattern_number_dot, 1),
).drop("upper_bound_1").withColumn(
"lower_bound", F.col("lower_bound").cast("float")
).withColumn("upper_bound", F.col("upper_bound").cast("float"))
# log.info("step 4 df2")
# df2.printSchema()
# df2.show(truncate=False)
return df2
def process_orderh_step_5(self, cda_subset):
pattern_number_dot = r"([0-9.]+)"
df1 = cda_subset.withColumn(
"smaller",
F.when(
F.col("ResultValue").startswith("<"),
F.regexp_extract(F.col("ResultValue"), pattern_number_dot, 1),
).otherwise(None),
).withColumn(
"smaller", F.col("smaller").cast("float")
).withColumn(
"hyphen_smaller",
F.when(F.col("smaller") < F.col("lower_bound"), 1).otherwise(0),
).drop("smaller")
return df1
def process_orderh(self, cda_subset):
df7_0 = cda_subset.drop("OrderHistory")
df7_1 = self.process_orderh_step_1(cda_subset)
df7_2 = self.process_orderh_step_3(df7_1)
# filter later == 1, keep the current results
df7_2 = self.process_orderh_step_4(df7_2)
# step 5 hyphen smaller than lower bound ResultVale has <
df7_2 = self.process_orderh_step_5(df7_2)
return df7_2
if __name__ == "__main__":
cda_transform = CDATransformation()
df1 = cda_transform.preprocess_cda(df)
df1.collect()
For two rows of data, it consume > 400 MB data. I will post my data in the 2nd post. I think any local dataframe created in functions beginning with process_orderh_step should be dereferenced and corresponding memory should be released. Why it consume so much memory? How can I optimize it?
09-23-2021 08:41 AM
Here is my input file
EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date
11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00
12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '<1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '<0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group