09-23-2021 08:38 AM
I have pyspark code running in my local mac, which has 6 cores and 16 GB. I run it in pycharm to do first test.
spark = (
SparkSession.builder.appName("loc")
.master("local[2]")
.config("spark.driver.bindAddress","localhost")
.config("spark.driver.memory", "12g")
.config("spark.sql.broadcastTimeout", "1500")
.config("spark.scheduler.listenerbus.eventqueue.capacity", "20000")
.config("spark.sql.shuffle.partitions", "1")
.getOrCreate()
)
from pyspark.sql import functions as F
class CDATransformation:
def __init__(
self
):
pass
def preprocess_cda(self, cda_subset):
print("***CDA PREPROCSSING***")
return self._preprocess_cda(cda_subset)
def _preprocess_cda(self, cda_subset): # pragma: no cover
df = cda_subset.withColumn(
"EffectiveTime",
F.when(
cda_subset["EffectiveTime"].endswith("Z"), cda_subset["EffectiveTime"]
).otherwise(F.concat(cda_subset["EffectiveTime"], F.lit("Z"))),
)
def process_orderh_step_1(self, cda_subset):
schema = ArrayType(
StructType(
[
StructField(
"Codes", ArrayType(MapType(StringType(), StringType()))
),
StructField(
"ComponentResults",
ArrayType(
StructType(
[
StructField(
"Codes",
ArrayType(MapType(StringType(), StringType())),
),
StructField("ComponentName", StringType()),
StructField("EffectiveDateTime", StringType()),
StructField("ResultValue", StringType()),
StructField("ResultUnit", StringType()),
StructField("ReferenceRange", StringType()),
StructField("Status", StringType()),
]
)
),
),
StructField("EffectiveDateTime", StringType()),
StructField("ReferenceId", StringType()),
StructField("Narrative", StringType()),
StructField("Impression", StringType()),
]
)
)
df1 = (
cda_subset.withColumn(
"first_array",
F.from_json(
F.col("OrderHistory"), schema=schema),
).drop("OrderHistory")
.withColumn(
"lab_array", F.explode_outer(F.col("first_array"))
).drop("first_array")
.withColumn(
"main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
).drop("EID", "EffectiveTime")
)
return df1
def process_orderh_step_2(self, cda_subset):
df1 = cda_subset.withColumn(
"lab_array", F.explode_outer(F.col("first_array"))
).drop("first_array")
df2 = df1.withColumn(
"main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
).drop("EID", "EffectiveTime")
return df2
def process_orderh_step_3(self, cda_subset):
df1 = (
cda_subset.withColumn(
"comp_results_item", F.explode_outer(F.col("lab_array.ComponentResults"))
).drop("lab_array")
.select(
"*",
F.col("comp_results_item.ComponentName"),
F.col("comp_results_item.EffectiveDateTime"),
F.col("comp_results_item.ResultValue"),
F.col("comp_results_item.ReferenceRange"),
).drop("comp_results_item")
.withColumn(
"EffectiveDateTime",
F.when(
F.col("EffectiveDateTime").endswith("Z"), F.col("EffectiveDateTime")
).otherwise(F.concat(F.col("EffectiveDateTime"), F.lit("Z"))),
)
)
df5 = df1.withColumn(
"EffectiveDateTime",
F.to_timestamp("EffectiveDateTime", "yyyy-MM-dd'T'HH:mm:ss'Z'"),
).withColumn(
"Period_Start_Date", F.split(F.col("Period_Start_Date"), r"\+")[0]
)
df6 = df5.withColumn(
"Start_Date", F.to_date("Period_Start_Date")
).withColumn(
"later",
F.when(F.col("EffectiveDateTime") > F.col("Start_Date"), 1).otherwise(0),
)
# df7 = df6.withColumn(
# "later",
# F.when(
# F.col("EffectiveDateTime") > F.col("Period_Start_Date"), 1
# ).otherwise(0),
# )
# log.info("step 3 df7")
# df7.printSchema()
# df7.show(truncate=False)
return df6
def process_orderh_step_4(self, cda_subset):
df1 = cda_subset.filter(cda_subset["later"] == 1)
# log.info("step 4 df1")
# df1.printSchema()
# df1.show(truncate=False)
pattern_number_dot = r"([0-9.]+)"
df2 = df1.withColumn(
"lower_bound", F.split(F.col("ReferenceRange"), "-").getItem(0)
).withColumn(
"upper_bound_1", F.split(F.col("ReferenceRange"), "-").getItem(1)
).withColumn(
"upper_bound",
F.regexp_extract(F.col("upper_bound_1"), pattern_number_dot, 1),
).drop("upper_bound_1").withColumn(
"lower_bound", F.col("lower_bound").cast("float")
).withColumn("upper_bound", F.col("upper_bound").cast("float"))
# log.info("step 4 df2")
# df2.printSchema()
# df2.show(truncate=False)
return df2
def process_orderh_step_5(self, cda_subset):
pattern_number_dot = r"([0-9.]+)"
df1 = cda_subset.withColumn(
"smaller",
F.when(
F.col("ResultValue").startswith("<"),
F.regexp_extract(F.col("ResultValue"), pattern_number_dot, 1),
).otherwise(None),
).withColumn(
"smaller", F.col("smaller").cast("float")
).withColumn(
"hyphen_smaller",
F.when(F.col("smaller") < F.col("lower_bound"), 1).otherwise(0),
).drop("smaller")
return df1
def process_orderh(self, cda_subset):
df7_0 = cda_subset.drop("OrderHistory")
df7_1 = self.process_orderh_step_1(cda_subset)
df7_2 = self.process_orderh_step_3(df7_1)
# filter later == 1, keep the current results
df7_2 = self.process_orderh_step_4(df7_2)
# step 5 hyphen smaller than lower bound ResultVale has <
df7_2 = self.process_orderh_step_5(df7_2)
return df7_2
if __name__ == "__main__":
cda_transform = CDATransformation()
df1 = cda_transform.preprocess_cda(df)
df1.collect()
For two rows of data, it consume > 400 MB data. I will post my data in the 2nd post. I think any local dataframe created in functions beginning with process_orderh_step should be dereferenced and corresponding memory should be released. Why it consume so much memory? How can I optimize it?
09-23-2021 08:41 AM
Here is my input file
EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date
11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00
12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '<1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '<0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now