cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

pyspark intermediate dataframe consumes many memory

fymaterials_199
New Contributor II

I have pyspark code running in my local mac, which has 6 cores and 16 GB. I run it in pycharm to do first test.

spark = (
    SparkSession.builder.appName("loc")
    .master("local[2]")
    .config("spark.driver.bindAddress","localhost")
    .config("spark.driver.memory", "12g")
    .config("spark.sql.broadcastTimeout", "1500")
    .config("spark.scheduler.listenerbus.eventqueue.capacity", "20000")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()
)
from pyspark.sql import functions as F
 
class CDATransformation:
    def __init__(
        self
    ):
        pass
 
    def preprocess_cda(self, cda_subset):
        print("***CDA PREPROCSSING***")
        return self._preprocess_cda(cda_subset)
 
    def _preprocess_cda(self, cda_subset):  # pragma: no cover
        df = cda_subset.withColumn(
            "EffectiveTime",
            F.when(
                cda_subset["EffectiveTime"].endswith("Z"), cda_subset["EffectiveTime"]
            ).otherwise(F.concat(cda_subset["EffectiveTime"], F.lit("Z"))),
        )
  
       def process_orderh_step_1(self, cda_subset):
        schema = ArrayType(
            StructType(
                [
                    StructField(
                        "Codes", ArrayType(MapType(StringType(), StringType()))
                    ),
                    StructField(
                        "ComponentResults",
                        ArrayType(
                            StructType(
                                [
                                    StructField(
                                        "Codes",
                                        ArrayType(MapType(StringType(), StringType())),
                                    ),
                                    StructField("ComponentName", StringType()),
                                    StructField("EffectiveDateTime", StringType()),
                                    StructField("ResultValue", StringType()),
                                    StructField("ResultUnit", StringType()),
                                    StructField("ReferenceRange", StringType()),
                                    StructField("Status", StringType()),
                                ]
                            )
                        ),
                    ),
                    StructField("EffectiveDateTime", StringType()),
                    StructField("ReferenceId", StringType()),
                    StructField("Narrative", StringType()),
                    StructField("Impression", StringType()),
                ]
            )
        )
 
        df1 = (
            cda_subset.withColumn(
            "first_array",
            F.from_json(
                F.col("OrderHistory"), schema=schema),
            ).drop("OrderHistory")
             .withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
            ).drop("first_array")
             .withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
            ).drop("EID", "EffectiveTime")
        )
 
        return df1
 
    def process_orderh_step_2(self, cda_subset):
        df1 = cda_subset.withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
        ).drop("first_array")
        df2 = df1.withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
        ).drop("EID", "EffectiveTime")
        return df2
 
    def process_orderh_step_3(self, cda_subset):
        df1 = (
                  cda_subset.withColumn(
                "comp_results_item", F.explode_outer(F.col("lab_array.ComponentResults"))
            ).drop("lab_array")
             .select(
            "*",
            F.col("comp_results_item.ComponentName"),
            F.col("comp_results_item.EffectiveDateTime"),
            F.col("comp_results_item.ResultValue"),
            F.col("comp_results_item.ReferenceRange"),
            ).drop("comp_results_item")
             .withColumn(
            "EffectiveDateTime",
            F.when(
                F.col("EffectiveDateTime").endswith("Z"), F.col("EffectiveDateTime")
            ).otherwise(F.concat(F.col("EffectiveDateTime"), F.lit("Z"))),
        )
        )
     
 
        df5 = df1.withColumn(
            "EffectiveDateTime",
            F.to_timestamp("EffectiveDateTime", "yyyy-MM-dd'T'HH:mm:ss'Z'"),
        ).withColumn(
            "Period_Start_Date", F.split(F.col("Period_Start_Date"), r"\+")[0]
        )
 
        df6 = df5.withColumn(
            "Start_Date", F.to_date("Period_Start_Date")
        ).withColumn(
            "later",
            F.when(F.col("EffectiveDateTime") > F.col("Start_Date"), 1).otherwise(0),
        )
 
        # df7 = df6.withColumn(
        #     "later",
        #     F.when(
        #         F.col("EffectiveDateTime") > F.col("Period_Start_Date"), 1
        #     ).otherwise(0),
        # )
        # log.info("step 3 df7")
        # df7.printSchema()
        # df7.show(truncate=False)
        return df6
 
    def process_orderh_step_4(self, cda_subset):
        df1 = cda_subset.filter(cda_subset["later"] == 1)
        # log.info("step 4 df1")
        # df1.printSchema()
        # df1.show(truncate=False)
        pattern_number_dot = r"([0-9.]+)"
 
        df2 = df1.withColumn(
            "lower_bound", F.split(F.col("ReferenceRange"), "-").getItem(0)
        ).withColumn(
            "upper_bound_1", F.split(F.col("ReferenceRange"), "-").getItem(1)
        ).withColumn(
            "upper_bound",
            F.regexp_extract(F.col("upper_bound_1"), pattern_number_dot, 1),
        ).drop("upper_bound_1").withColumn(
            "lower_bound", F.col("lower_bound").cast("float")
        ).withColumn("upper_bound", F.col("upper_bound").cast("float"))
        # log.info("step 4 df2")
        # df2.printSchema()
        # df2.show(truncate=False)
 
        return df2
 
    def process_orderh_step_5(self, cda_subset):
        pattern_number_dot = r"([0-9.]+)"
        df1 = cda_subset.withColumn(
            "smaller",
            F.when(
                F.col("ResultValue").startswith("<"),
                F.regexp_extract(F.col("ResultValue"), pattern_number_dot, 1),
            ).otherwise(None),
        ).withColumn(
            "smaller", F.col("smaller").cast("float")
        ).withColumn(
            "hyphen_smaller",
            F.when(F.col("smaller") < F.col("lower_bound"), 1).otherwise(0),
        ).drop("smaller")
 
        return df1
 
 
    def process_orderh(self, cda_subset):
        df7_0 = cda_subset.drop("OrderHistory")
        df7_1 = self.process_orderh_step_1(cda_subset)
        df7_2 = self.process_orderh_step_3(df7_1)
        # filter later == 1, keep the current results
        df7_2 = self.process_orderh_step_4(df7_2)
        # step 5 hyphen smaller than lower bound ResultVale has <
        df7_2 = self.process_orderh_step_5(df7_2)
        
        return df7_2
 
if __name__ == "__main__":
    cda_transform = CDATransformation()
    df1 = cda_transform.preprocess_cda(df)
    df1.collect()

For two rows of data, it consume > 400 MB data. I will post my data in the 2nd post. I think any local dataframe created in functions beginning with process_orderh_step should be dereferenced and corresponding memory should be released. Why it consume so much memory? How can I optimize it?

1 REPLY 1

fymaterials_199
New Contributor II

Here is my input file

EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date

11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00

12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '<1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '<0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.