<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Driver: Out of Memory in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81013#M36196</link>
    <description>&lt;P&gt;Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).&lt;/P&gt;&lt;P&gt;To resolve the issue, you could try adding a watermark to your upsert streaming query:&lt;/P&gt;&lt;P&gt;(&lt;BR /&gt;spark&lt;BR /&gt;.readStream&lt;BR /&gt;.format("delta")&lt;BR /&gt;.table(source_table)&lt;BR /&gt;.withColumn("key", F.from_json(F.col("key"), key_schema))&lt;BR /&gt;.withColumn("value", F.from_json(F.col("value"), value_schema))&lt;BR /&gt;.withColumn("landing_time", F.col("_ingested_time"))&lt;BR /&gt;.withWatermark("event_time", "1 hour")&lt;BR /&gt;.writeStream&lt;BR /&gt;.foreachBatch(merge_stream)&lt;BR /&gt;.option("checkpointLocation", checkpoint_location)&lt;BR /&gt;.start()&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.&lt;/P&gt;&lt;P&gt;Check out this documentation for more info on watermarks:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/structured-streaming/watermarks.html" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/watermarks.html&lt;/A&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 29 Jul 2024 18:46:07 GMT</pubDate>
    <dc:creator>xorbix_rshiva</dc:creator>
    <dc:date>2024-07-29T18:46:07Z</dc:date>
    <item>
      <title>Driver: Out of Memory</title>
      <link>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/80935#M36166</link>
      <description>&lt;P&gt;Hi everyone,&lt;/P&gt;&lt;P&gt;I have a streaming job with 29 notebooks that runs continuously. Initially, I allocated 28 GB of memory to the driver, but the job failed with a "Driver Out of Memory" error after 4 hours of execution.&lt;/P&gt;&lt;P&gt;To address this, I increased the driver's memory to 56 GB, but the job still failed after running for 60 hours.&lt;/P&gt;&lt;P&gt;Here's the code I’m using to ingest data from Kafka:&lt;/P&gt;&lt;P&gt;&lt;EM&gt;(&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;raw_kafka_events&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.writeStream&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.format('delta')&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.option('key.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.option('value.converter', 'org.apache.kafka.connect.json.ByteArrayConverter')&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.option('checkpointLocation', checkpoint_location)&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.outputMode('append')&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.toTable(table_path)&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;)&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;And here's the code for upserting data from Kafka to the target table:&lt;/P&gt;&lt;P&gt;&lt;EM&gt;(&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;spark&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.readStream&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.format("delta")&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.table(source_table)&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.withColumn("key", F.from_json(F.col("key"), key_schema))&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.withColumn("value", F.from_json(F.col("value"), value_schema))&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.withColumn("landing_time", F.col("_ingested_time"))&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.writeStream&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.foreachBatch(merge_stream)&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.option("checkpointLocation", checkpoint_location)&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;.start()&lt;/EM&gt;&lt;BR /&gt;&lt;EM&gt;)&lt;/EM&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Despite these configurations, the driver continues to run out of memory. Any suggestions on how to further address this issue?&lt;/P&gt;</description>
      <pubDate>Mon, 29 Jul 2024 04:59:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/80935#M36166</guid>
      <dc:creator>tramtran</dc:creator>
      <dc:date>2024-07-29T04:59:46Z</dc:date>
    </item>
    <item>
      <title>Re: Driver: Out of Memory</title>
      <link>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81013#M36196</link>
      <description>&lt;P&gt;Does your merge_stream function contain any stateful operations, such as aggregation or deduplication logic? If so, your streaming job may be accumulating state in memory over time, which will eventually result in OOM error. If this is the case, you will see memory usage increase over time, until it hits the amount allocated to your cluster (View memory utilization in the "Metrics" tab for your cluster).&lt;/P&gt;&lt;P&gt;To resolve the issue, you could try adding a watermark to your upsert streaming query:&lt;/P&gt;&lt;P&gt;(&lt;BR /&gt;spark&lt;BR /&gt;.readStream&lt;BR /&gt;.format("delta")&lt;BR /&gt;.table(source_table)&lt;BR /&gt;.withColumn("key", F.from_json(F.col("key"), key_schema))&lt;BR /&gt;.withColumn("value", F.from_json(F.col("value"), value_schema))&lt;BR /&gt;.withColumn("landing_time", F.col("_ingested_time"))&lt;BR /&gt;.withWatermark("event_time", "1 hour")&lt;BR /&gt;.writeStream&lt;BR /&gt;.foreachBatch(merge_stream)&lt;BR /&gt;.option("checkpointLocation", checkpoint_location)&lt;BR /&gt;.start()&lt;BR /&gt;)&lt;/P&gt;&lt;P&gt;Applying a watermark will control the amount of memory used by your streaming job, and it can also improve job performance by keeping your stream state to a manageable size. If you apply a watermark, you should set the lateness threshold according to your requirements. For example, if your stream contains deduplication logic and you expect no duplicate records to arrive within more than 10 minutes of each other, you could set your watermark timestamp to 10 minutes.&lt;/P&gt;&lt;P&gt;Check out this documentation for more info on watermarks:&amp;nbsp;&lt;A href="https://docs.databricks.com/en/structured-streaming/watermarks.html" target="_blank"&gt;https://docs.databricks.com/en/structured-streaming/watermarks.html&lt;/A&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 29 Jul 2024 18:46:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81013#M36196</guid>
      <dc:creator>xorbix_rshiva</dc:creator>
      <dc:date>2024-07-29T18:46:07Z</dc:date>
    </item>
    <item>
      <title>Re: Driver: Out of Memory</title>
      <link>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81023#M36201</link>
      <description>&lt;P&gt;Thanks&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/94141"&gt;@xorbix_rshiva&lt;/a&gt;&amp;nbsp;for your reply,&lt;/P&gt;&lt;P&gt;There was deduplication logic in my code. Could I use &lt;STRONG&gt;_source_cdc_time&lt;/STRONG&gt; for the watermark in this case?&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;EM&gt;def&amp;nbsp;merge_stream(microBatchDF,&amp;nbsp;i):&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;microBatchDF.createOrReplaceTempView("vw_delta")&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;microBatchDF._jdf.sparkSession().sql("""&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;MERGE&amp;nbsp;INTO customer as&amp;nbsp;tg&amp;nbsp;&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;USING&amp;nbsp;(&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;SELECT&amp;nbsp;*&amp;nbsp;FROM&amp;nbsp;(&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;SELECT&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;key.customerid,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;key.customerkey,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;value.after.customername,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;--meta&amp;nbsp;data&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;value.op,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;current_timestamp()&amp;nbsp;as&amp;nbsp;_ingested_time,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;to_timestamp(value.source.ts_ms/1000)&amp;nbsp;as&amp;nbsp;_source_cdc_time,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;to_timestamp(value.ts_ms/1000)&amp;nbsp;as&amp;nbsp;_kafka_created_time,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;landing_time&amp;nbsp;as&amp;nbsp;_landing_time,&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;row_number()&amp;nbsp;over(PARTITION&amp;nbsp;BY&amp;nbsp;key.customerid,&amp;nbsp;key.cusomterkey order&amp;nbsp;by&amp;nbsp;value.source.ts_ms&amp;nbsp;desc)&amp;nbsp;as&amp;nbsp;rank&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;FROM&amp;nbsp;vw_delta&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;WHERE&amp;nbsp;value.op&amp;nbsp;IS&amp;nbsp;NOT&amp;nbsp;NULL&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&amp;nbsp;as&amp;nbsp;t&amp;nbsp;&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;WHERE&amp;nbsp;rank&amp;nbsp;=&amp;nbsp;1&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;)&amp;nbsp;as&amp;nbsp;src&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;ON&amp;nbsp;&amp;nbsp;tg.customerid=&amp;nbsp;src.customerid&amp;nbsp;&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;AND&amp;nbsp;tg.customerkey=&amp;nbsp;src.customerkey&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;WHEN&amp;nbsp;MATCHED&amp;nbsp;AND&amp;nbsp;src.op&amp;nbsp;=&amp;nbsp;'d'&amp;nbsp;THEN&amp;nbsp;DELETE&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;WHEN&amp;nbsp;MATCHED&amp;nbsp;AND&amp;nbsp;src.op&amp;nbsp;!=&amp;nbsp;'d'&amp;nbsp;THEN&amp;nbsp;UPDATE&amp;nbsp;SET&amp;nbsp;*&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;WHEN&amp;nbsp;NOT&amp;nbsp;MATCHED&amp;nbsp;AND&amp;nbsp;src.op&amp;nbsp;!=&amp;nbsp;'d'&amp;nbsp;THEN&amp;nbsp;INSERT&amp;nbsp;*&lt;/EM&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;EM&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;""")&lt;/EM&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Tue, 30 Jul 2024 00:33:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81023#M36201</guid>
      <dc:creator>tramtran</dc:creator>
      <dc:date>2024-07-30T00:33:24Z</dc:date>
    </item>
    <item>
      <title>Re: Driver: Out of Memory</title>
      <link>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81148#M36240</link>
      <description>&lt;P&gt;It looks like&amp;nbsp;&lt;STRONG&gt;_source_cdc_time&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;is the timestamp for when the CDC transaction occurred in your source system. This would be a good choice for a timestamp column for your watermark, since you would be deduping values according to the time the transactions actually occurred, not the timestamp when they are ingested and processed in Databricks.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 30 Jul 2024 14:07:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/driver-out-of-memory/m-p/81148#M36240</guid>
      <dc:creator>xorbix_rshiva</dc:creator>
      <dc:date>2024-07-30T14:07:52Z</dc:date>
    </item>
  </channel>
</rss>

