<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Spark Streaming Job gets stuck in the &amp;quot;Stream Initializing&amp;quot; in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/spark-streaming-job-gets-stuck-in-the-quot-stream-initializing/m-p/119544#M45906</link>
    <description>&lt;P&gt;Hello all,&lt;/P&gt;&lt;P&gt;I am having an issue with my Spark Streaming Job. It is stuck at "Stream Initializing" stage.&lt;/P&gt;&lt;P&gt;Need your help here to understand what is happening inside the "Stream Initializing" stage of Spark Streaming job which is taking so long. Here are some more details -&lt;/P&gt;&lt;P&gt;1.This Streaming job (where we are having an issue) reads data from bronze table and inserts that data into silver table.&lt;/P&gt;&lt;P&gt;2.I am using cdf mechanism, and currently the bronze data read out from read_stream has around 200 new unprocessed records&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def process_cdc(batch_df, batch_id):
#    if batch_df.isEmpty():
#        return
   batch_df.show()
   print('haohaoprocess batch id: ', batch_id)
   print(batch_df.count())
   # Split insert/update và delete
   upserts_df = batch_df.filter(col("_change_type").isin("insert", "update_postimage"))
   print('upserts_df.count(): ',upserts_df.count())
   deletes_df = batch_df.filter(col("_change_type") == "delete")
   print('deletes_df.count(): ',deletes_df.count())
   # Clean data
   cleaned_upserts = handle_data(upserts_df)
   # Load Silver table
   silver_table = DeltaTable.forPath(spark, silver_full_path)
   # Upsert
   silver_table.alias("silver").merge(
       cleaned_upserts.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
   # Delete
   silver_table.alias("silver").merge(
       deletes_df.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedDelete().execute()

query = (
   spark.readStream
   .format("delta")
   .option("readChangeData", "true")
   .option("startingVersion", starting_version)
   .load(bronze_full_path)
   .writeStream
   .foreachBatch(process_cdc)
   .option("checkpointLocation", "abfss://...")
   .start()
)&lt;/LI-CODE&gt;</description>
    <pubDate>Sun, 18 May 2025 08:31:04 GMT</pubDate>
    <dc:creator>hao-uit</dc:creator>
    <dc:date>2025-05-18T08:31:04Z</dc:date>
    <item>
      <title>Spark Streaming Job gets stuck in the "Stream Initializing"</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-job-gets-stuck-in-the-quot-stream-initializing/m-p/119544#M45906</link>
      <description>&lt;P&gt;Hello all,&lt;/P&gt;&lt;P&gt;I am having an issue with my Spark Streaming Job. It is stuck at "Stream Initializing" stage.&lt;/P&gt;&lt;P&gt;Need your help here to understand what is happening inside the "Stream Initializing" stage of Spark Streaming job which is taking so long. Here are some more details -&lt;/P&gt;&lt;P&gt;1.This Streaming job (where we are having an issue) reads data from bronze table and inserts that data into silver table.&lt;/P&gt;&lt;P&gt;2.I am using cdf mechanism, and currently the bronze data read out from read_stream has around 200 new unprocessed records&lt;/P&gt;&lt;LI-CODE lang="python"&gt;def process_cdc(batch_df, batch_id):
#    if batch_df.isEmpty():
#        return
   batch_df.show()
   print('haohaoprocess batch id: ', batch_id)
   print(batch_df.count())
   # Split insert/update và delete
   upserts_df = batch_df.filter(col("_change_type").isin("insert", "update_postimage"))
   print('upserts_df.count(): ',upserts_df.count())
   deletes_df = batch_df.filter(col("_change_type") == "delete")
   print('deletes_df.count(): ',deletes_df.count())
   # Clean data
   cleaned_upserts = handle_data(upserts_df)
   # Load Silver table
   silver_table = DeltaTable.forPath(spark, silver_full_path)
   # Upsert
   silver_table.alias("silver").merge(
       cleaned_upserts.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
   # Delete
   silver_table.alias("silver").merge(
       deletes_df.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedDelete().execute()

query = (
   spark.readStream
   .format("delta")
   .option("readChangeData", "true")
   .option("startingVersion", starting_version)
   .load(bronze_full_path)
   .writeStream
   .foreachBatch(process_cdc)
   .option("checkpointLocation", "abfss://...")
   .start()
)&lt;/LI-CODE&gt;</description>
      <pubDate>Sun, 18 May 2025 08:31:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-job-gets-stuck-in-the-quot-stream-initializing/m-p/119544#M45906</guid>
      <dc:creator>hao-uit</dc:creator>
      <dc:date>2025-05-18T08:31:04Z</dc:date>
    </item>
    <item>
      <title>Re: Spark Streaming Job gets stuck in the "Stream Initializing"</title>
      <link>https://community.databricks.com/t5/data-engineering/spark-streaming-job-gets-stuck-in-the-quot-stream-initializing/m-p/120102#M46064</link>
      <description>&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/164824"&gt;@hao-uit&lt;/a&gt;, do you see any kind of load on the driver and event logs?&lt;/P&gt;
&lt;P&gt;Also, what libraries you have installed on your cluster?&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 23 May 2025 17:04:57 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/spark-streaming-job-gets-stuck-in-the-quot-stream-initializing/m-p/120102#M46064</guid>
      <dc:creator>nikhilj0421</dc:creator>
      <dc:date>2025-05-23T17:04:57Z</dc:date>
    </item>
  </channel>
</rss>

