cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Streaming Job gets stuck in the "Stream Initializing"

hao-uit
New Contributor

Hello all,

I am having an issue with my Spark Streaming Job. It is stuck at "Stream Initializing" stage.

Need your help here to understand what is happening inside the "Stream Initializing" stage of Spark Streaming job which is taking so long. Here are some more details -

1.This Streaming job (where we are having an issue) reads data from bronze table and inserts that data into silver table.

2.I am using cdf mechanism, and currently the bronze data read out from read_stream has around 200 new unprocessed records

def process_cdc(batch_df, batch_id):
#    if batch_df.isEmpty():
#        return
   batch_df.show()
   print('haohaoprocess batch id: ', batch_id)
   print(batch_df.count())
   # Split insert/update vĂ  delete
   upserts_df = batch_df.filter(col("_change_type").isin("insert", "update_postimage"))
   print('upserts_df.count(): ',upserts_df.count())
   deletes_df = batch_df.filter(col("_change_type") == "delete")
   print('deletes_df.count(): ',deletes_df.count())
   # Clean data
   cleaned_upserts = handle_data(upserts_df)
   # Load Silver table
   silver_table = DeltaTable.forPath(spark, silver_full_path)
   # Upsert
   silver_table.alias("silver").merge(
       cleaned_upserts.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
   # Delete
   silver_table.alias("silver").merge(
       deletes_df.alias("bronze"),
       "silver.primary_key = bronze.primary_key"
   ).whenMatchedDelete().execute()

query = (
   spark.readStream
   .format("delta")
   .option("readChangeData", "true")
   .option("startingVersion", starting_version)
   .load(bronze_full_path)
   .writeStream
   .foreachBatch(process_cdc)
   .option("checkpointLocation", "abfss://...")
   .start()
)
1 REPLY 1

nikhilj0421
Databricks Employee
Databricks Employee

Hi @hao-uit, do you see any kind of load on the driver and event logs?

Also, what libraries you have installed on your cluster?