cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

pyspark intermediate dataframe consumes many memory

fymaterials_199
New Contributor II

I have pyspark code running in my local mac, which has 6 cores and 16 GB. I run it in pycharm to do first test.

spark = (
    SparkSession.builder.appName("loc")
    .master("local[2]")
    .config("spark.driver.bindAddress","localhost")
    .config("spark.driver.memory", "12g")
    .config("spark.sql.broadcastTimeout", "1500")
    .config("spark.scheduler.listenerbus.eventqueue.capacity", "20000")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()
)
from pyspark.sql import functions as F
 
class CDATransformation:
    def __init__(
        self
    ):
        pass
 
    def preprocess_cda(self, cda_subset):
        print("***CDA PREPROCSSING***")
        return self._preprocess_cda(cda_subset)
 
    def _preprocess_cda(self, cda_subset):  # pragma: no cover
        df = cda_subset.withColumn(
            "EffectiveTime",
            F.when(
                cda_subset["EffectiveTime"].endswith("Z"), cda_subset["EffectiveTime"]
            ).otherwise(F.concat(cda_subset["EffectiveTime"], F.lit("Z"))),
        )
  
       def process_orderh_step_1(self, cda_subset):
        schema = ArrayType(
            StructType(
                [
                    StructField(
                        "Codes", ArrayType(MapType(StringType(), StringType()))
                    ),
                    StructField(
                        "ComponentResults",
                        ArrayType(
                            StructType(
                                [
                                    StructField(
                                        "Codes",
                                        ArrayType(MapType(StringType(), StringType())),
                                    ),
                                    StructField("ComponentName", StringType()),
                                    StructField("EffectiveDateTime", StringType()),
                                    StructField("ResultValue", StringType()),
                                    StructField("ResultUnit", StringType()),
                                    StructField("ReferenceRange", StringType()),
                                    StructField("Status", StringType()),
                                ]
                            )
                        ),
                    ),
                    StructField("EffectiveDateTime", StringType()),
                    StructField("ReferenceId", StringType()),
                    StructField("Narrative", StringType()),
                    StructField("Impression", StringType()),
                ]
            )
        )
 
        df1 = (
            cda_subset.withColumn(
            "first_array",
            F.from_json(
                F.col("OrderHistory"), schema=schema),
            ).drop("OrderHistory")
             .withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
            ).drop("first_array")
             .withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
            ).drop("EID", "EffectiveTime")
        )
 
        return df1
 
    def process_orderh_step_2(self, cda_subset):
        df1 = cda_subset.withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
        ).drop("first_array")
        df2 = df1.withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
        ).drop("EID", "EffectiveTime")
        return df2
 
    def process_orderh_step_3(self, cda_subset):
        df1 = (
                  cda_subset.withColumn(
                "comp_results_item", F.explode_outer(F.col("lab_array.ComponentResults"))
            ).drop("lab_array")
             .select(
            "*",
            F.col("comp_results_item.ComponentName"),
            F.col("comp_results_item.EffectiveDateTime"),
            F.col("comp_results_item.ResultValue"),
            F.col("comp_results_item.ReferenceRange"),
            ).drop("comp_results_item")
             .withColumn(
            "EffectiveDateTime",
            F.when(
                F.col("EffectiveDateTime").endswith("Z"), F.col("EffectiveDateTime")
            ).otherwise(F.concat(F.col("EffectiveDateTime"), F.lit("Z"))),
        )
        )
     
 
        df5 = df1.withColumn(
            "EffectiveDateTime",
            F.to_timestamp("EffectiveDateTime", "yyyy-MM-dd'T'HH:mm:ss'Z'"),
        ).withColumn(
            "Period_Start_Date", F.split(F.col("Period_Start_Date"), r"\+")[0]
        )
 
        df6 = df5.withColumn(
            "Start_Date", F.to_date("Period_Start_Date")
        ).withColumn(
            "later",
            F.when(F.col("EffectiveDateTime") > F.col("Start_Date"), 1).otherwise(0),
        )
 
        # df7 = df6.withColumn(
        #     "later",
        #     F.when(
        #         F.col("EffectiveDateTime") > F.col("Period_Start_Date"), 1
        #     ).otherwise(0),
        # )
        # log.info("step 3 df7")
        # df7.printSchema()
        # df7.show(truncate=False)
        return df6
 
    def process_orderh_step_4(self, cda_subset):
        df1 = cda_subset.filter(cda_subset["later"] == 1)
        # log.info("step 4 df1")
        # df1.printSchema()
        # df1.show(truncate=False)
        pattern_number_dot = r"([0-9.]+)"
 
        df2 = df1.withColumn(
            "lower_bound", F.split(F.col("ReferenceRange"), "-").getItem(0)
        ).withColumn(
            "upper_bound_1", F.split(F.col("ReferenceRange"), "-").getItem(1)
        ).withColumn(
            "upper_bound",
            F.regexp_extract(F.col("upper_bound_1"), pattern_number_dot, 1),
        ).drop("upper_bound_1").withColumn(
            "lower_bound", F.col("lower_bound").cast("float")
        ).withColumn("upper_bound", F.col("upper_bound").cast("float"))
        # log.info("step 4 df2")
        # df2.printSchema()
        # df2.show(truncate=False)
 
        return df2
 
    def process_orderh_step_5(self, cda_subset):
        pattern_number_dot = r"([0-9.]+)"
        df1 = cda_subset.withColumn(
            "smaller",
            F.when(
                F.col("ResultValue").startswith("<"),
                F.regexp_extract(F.col("ResultValue"), pattern_number_dot, 1),
            ).otherwise(None),
        ).withColumn(
            "smaller", F.col("smaller").cast("float")
        ).withColumn(
            "hyphen_smaller",
            F.when(F.col("smaller") < F.col("lower_bound"), 1).otherwise(0),
        ).drop("smaller")
 
        return df1
 
 
    def process_orderh(self, cda_subset):
        df7_0 = cda_subset.drop("OrderHistory")
        df7_1 = self.process_orderh_step_1(cda_subset)
        df7_2 = self.process_orderh_step_3(df7_1)
        # filter later == 1, keep the current results
        df7_2 = self.process_orderh_step_4(df7_2)
        # step 5 hyphen smaller than lower bound ResultVale has <
        df7_2 = self.process_orderh_step_5(df7_2)
        
        return df7_2
 
if __name__ == "__main__":
    cda_transform = CDATransformation()
    df1 = cda_transform.preprocess_cda(df)
    df1.collect()

For two rows of data, it consume > 400 MB data. I will post my data in the 2nd post. I think any local dataframe created in functions beginning with process_orderh_step should be dereferenced and corresponding memory should be released. Why it consume so much memory? How can I optimize it?

1 REPLY 1

fymaterials_199
New Contributor II

Here is my input file

EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date

11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00

12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '<1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '<0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '<10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '<16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group