<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: pyspark intermediate dataframe consumes many memory in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/pyspark-intermediate-dataframe-consumes-many-memory/m-p/14622#M9094</link>
    <description>&lt;P&gt;Here is my input file&lt;/P&gt;&lt;P&gt;EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date&lt;/P&gt;&lt;P&gt;11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '&amp;lt;16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00&lt;/P&gt;&lt;P&gt;12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '&amp;lt;1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '&amp;lt;0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '&amp;lt;16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Thu, 23 Sep 2021 15:41:37 GMT</pubDate>
    <dc:creator>fymaterials_199</dc:creator>
    <dc:date>2021-09-23T15:41:37Z</dc:date>
    <item>
      <title>pyspark intermediate dataframe consumes many memory</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-intermediate-dataframe-consumes-many-memory/m-p/14621#M9093</link>
      <description>&lt;P&gt;I have pyspark code running in my local mac, which has 6 cores and 16 GB. I run it in pycharm to do first test.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;spark = (
    SparkSession.builder.appName("loc")
    .master("local[2]")
    .config("spark.driver.bindAddress","localhost")
    .config("spark.driver.memory", "12g")
    .config("spark.sql.broadcastTimeout", "1500")
    .config("spark.scheduler.listenerbus.eventqueue.capacity", "20000")
    .config("spark.sql.shuffle.partitions", "1")
    .getOrCreate()
)
from pyspark.sql import functions as F
&amp;nbsp;
class CDATransformation:
    def __init__(
        self
    ):
        pass
&amp;nbsp;
    def preprocess_cda(self, cda_subset):
        print("***CDA PREPROCSSING***")
        return self._preprocess_cda(cda_subset)
&amp;nbsp;
    def _preprocess_cda(self, cda_subset):  # pragma: no cover
        df = cda_subset.withColumn(
            "EffectiveTime",
            F.when(
                cda_subset["EffectiveTime"].endswith("Z"), cda_subset["EffectiveTime"]
            ).otherwise(F.concat(cda_subset["EffectiveTime"], F.lit("Z"))),
        )
  
       def process_orderh_step_1(self, cda_subset):
        schema = ArrayType(
            StructType(
                [
                    StructField(
                        "Codes", ArrayType(MapType(StringType(), StringType()))
                    ),
                    StructField(
                        "ComponentResults",
                        ArrayType(
                            StructType(
                                [
                                    StructField(
                                        "Codes",
                                        ArrayType(MapType(StringType(), StringType())),
                                    ),
                                    StructField("ComponentName", StringType()),
                                    StructField("EffectiveDateTime", StringType()),
                                    StructField("ResultValue", StringType()),
                                    StructField("ResultUnit", StringType()),
                                    StructField("ReferenceRange", StringType()),
                                    StructField("Status", StringType()),
                                ]
                            )
                        ),
                    ),
                    StructField("EffectiveDateTime", StringType()),
                    StructField("ReferenceId", StringType()),
                    StructField("Narrative", StringType()),
                    StructField("Impression", StringType()),
                ]
            )
        )
&amp;nbsp;
        df1 = (
            cda_subset.withColumn(
            "first_array",
            F.from_json(
                F.col("OrderHistory"), schema=schema),
            ).drop("OrderHistory")
             .withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
            ).drop("first_array")
             .withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
            ).drop("EID", "EffectiveTime")
        )
&amp;nbsp;
        return df1
&amp;nbsp;
    def process_orderh_step_2(self, cda_subset):
        df1 = cda_subset.withColumn(
            "lab_array", F.explode_outer(F.col("first_array"))
        ).drop("first_array")
        df2 = df1.withColumn(
            "main_id", F.concat_ws("#", F.col("EID"), F.col("EffectiveTime"))
        ).drop("EID", "EffectiveTime")
        return df2
&amp;nbsp;
    def process_orderh_step_3(self, cda_subset):
        df1 = (
                  cda_subset.withColumn(
                "comp_results_item", F.explode_outer(F.col("lab_array.ComponentResults"))
            ).drop("lab_array")
             .select(
            "*",
            F.col("comp_results_item.ComponentName"),
            F.col("comp_results_item.EffectiveDateTime"),
            F.col("comp_results_item.ResultValue"),
            F.col("comp_results_item.ReferenceRange"),
            ).drop("comp_results_item")
             .withColumn(
            "EffectiveDateTime",
            F.when(
                F.col("EffectiveDateTime").endswith("Z"), F.col("EffectiveDateTime")
            ).otherwise(F.concat(F.col("EffectiveDateTime"), F.lit("Z"))),
        )
        )
     
&amp;nbsp;
        df5 = df1.withColumn(
            "EffectiveDateTime",
            F.to_timestamp("EffectiveDateTime", "yyyy-MM-dd'T'HH:mm:ss'Z'"),
        ).withColumn(
            "Period_Start_Date", F.split(F.col("Period_Start_Date"), r"\+")[0]
        )
&amp;nbsp;
        df6 = df5.withColumn(
            "Start_Date", F.to_date("Period_Start_Date")
        ).withColumn(
            "later",
            F.when(F.col("EffectiveDateTime") &amp;gt; F.col("Start_Date"), 1).otherwise(0),
        )
&amp;nbsp;
        # df7 = df6.withColumn(
        #     "later",
        #     F.when(
        #         F.col("EffectiveDateTime") &amp;gt; F.col("Period_Start_Date"), 1
        #     ).otherwise(0),
        # )
        # log.info("step 3 df7")
        # df7.printSchema()
        # df7.show(truncate=False)
        return df6
&amp;nbsp;
    def process_orderh_step_4(self, cda_subset):
        df1 = cda_subset.filter(cda_subset["later"] == 1)
        # log.info("step 4 df1")
        # df1.printSchema()
        # df1.show(truncate=False)
        pattern_number_dot = r"([0-9.]+)"
&amp;nbsp;
        df2 = df1.withColumn(
            "lower_bound", F.split(F.col("ReferenceRange"), "-").getItem(0)
        ).withColumn(
            "upper_bound_1", F.split(F.col("ReferenceRange"), "-").getItem(1)
        ).withColumn(
            "upper_bound",
            F.regexp_extract(F.col("upper_bound_1"), pattern_number_dot, 1),
        ).drop("upper_bound_1").withColumn(
            "lower_bound", F.col("lower_bound").cast("float")
        ).withColumn("upper_bound", F.col("upper_bound").cast("float"))
        # log.info("step 4 df2")
        # df2.printSchema()
        # df2.show(truncate=False)
&amp;nbsp;
        return df2
&amp;nbsp;
    def process_orderh_step_5(self, cda_subset):
        pattern_number_dot = r"([0-9.]+)"
        df1 = cda_subset.withColumn(
            "smaller",
            F.when(
                F.col("ResultValue").startswith("&amp;lt;"),
                F.regexp_extract(F.col("ResultValue"), pattern_number_dot, 1),
            ).otherwise(None),
        ).withColumn(
            "smaller", F.col("smaller").cast("float")
        ).withColumn(
            "hyphen_smaller",
            F.when(F.col("smaller") &amp;lt; F.col("lower_bound"), 1).otherwise(0),
        ).drop("smaller")
&amp;nbsp;
        return df1
&amp;nbsp;
&amp;nbsp;
    def process_orderh(self, cda_subset):
        df7_0 = cda_subset.drop("OrderHistory")
        df7_1 = self.process_orderh_step_1(cda_subset)
        df7_2 = self.process_orderh_step_3(df7_1)
        # filter later == 1, keep the current results
        df7_2 = self.process_orderh_step_4(df7_2)
        # step 5 hyphen smaller than lower bound ResultVale has &amp;lt;
        df7_2 = self.process_orderh_step_5(df7_2)
        
        return df7_2
&amp;nbsp;
if __name__ == "__main__":
    cda_transform = CDATransformation()
    df1 = cda_transform.preprocess_cda(df)
    df1.collect()&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;For two rows of data, it consume &amp;gt; 400 MB data. I will post my data in the 2nd post. I think any local dataframe created in functions beginning with process_orderh_step should be dereferenced and corresponding memory should be released.  Why it consume so much memory? How can I optimize it?&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 23 Sep 2021 15:38:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-intermediate-dataframe-consumes-many-memory/m-p/14621#M9093</guid>
      <dc:creator>fymaterials_199</dc:creator>
      <dc:date>2021-09-23T15:38:26Z</dc:date>
    </item>
    <item>
      <title>Re: pyspark intermediate dataframe consumes many memory</title>
      <link>https://community.databricks.com/t5/data-engineering/pyspark-intermediate-dataframe-consumes-many-memory/m-p/14622#M9094</link>
      <description>&lt;P&gt;Here is my input file&lt;/P&gt;&lt;P&gt;EID,EffectiveTime,OrderHistory,dummy_col,Period_Start_Date&lt;/P&gt;&lt;P&gt;11,2019-04-19T02:50:42.6918667Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '500', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-2'}], 'ComponentName': 'component_2', 'EffectiveDateTime': '2019-04-19T00:35:41Z', 'ResultValue': '2.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-2'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007636', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-3'}], 'ComponentName': 'component_3', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '2.9', 'ResultUnit': 'K/uL', 'ReferenceRange': '4.0 - 11.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '30', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '11.7', 'ResultUnit': '%', 'ReferenceRange': '&amp;lt;16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T00:15:46Z', 'ResultValue': '1.2', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '11-8'}], 'EffectiveDateTime': '2019-04-18T23:48:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1038007637', 'Narrative': 'narrative_1', 'Impression': ''}]",dummy_value_1,2019-04-18T23:29:27+00:00&lt;/P&gt;&lt;P&gt;12,2019-04-19T02:50:42.5618605Z,"[{'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T04:01:57Z', 'ResultValue': '&amp;lt;1.4', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '298-7'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565237', 'Narrative': 'narrative_21', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-8'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-6'}], 'ComponentName': 'component_6', 'EffectiveDateTime': '2019-04-18T03:01:57Z', 'ResultValue': '&amp;lt;0.5', 'ReferenceRange': '10.0 - 30.0 ug/mL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T06:05:40Z', 'ResultValue': '20', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '5643'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565232', 'Narrative': '', 'Impression': ''}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-9'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'ComponentResults': [{'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-1'}], 'ComponentName': 'component_1', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '5', 'ResultUnit': 'mg/dL', 'ReferenceRange': '&amp;lt;10 mg/dL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-4'}], 'ComponentName': 'component_4', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '18', 'ResultUnit': '%', 'ReferenceRange': '&amp;lt;16.8 %', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '1-5'}], 'ComponentName': 'component_5', 'EffectiveDateTime': '2019-04-19T04:05:40Z', 'ResultValue': '100', 'ResultUnit': 'K/uL', 'ReferenceRange': '2.0 - 8.0 K/uL', 'Status': 'completed'}, {'Codes': [{'CodeSystem': 'sys_1', 'Code': '2-4'}], 'EffectiveDateTime': '2019-04-18T03:06:00Z', 'Status': 'completed'}], 'ReferenceId': 'Result1037565229', 'Narrative': 'narrative_22', 'Impression': ''}]",dummy_value_2,2019-04-19T13:29:27+00:00&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 23 Sep 2021 15:41:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/pyspark-intermediate-dataframe-consumes-many-memory/m-p/14622#M9094</guid>
      <dc:creator>fymaterials_199</dc:creator>
      <dc:date>2021-09-23T15:41:37Z</dc:date>
    </item>
  </channel>
</rss>

