Hello all,
I am having an issue with my Spark Streaming Job. It is stuck at "Stream Initializing" stage.
Need your help here to understand what is happening inside the "Stream Initializing" stage of Spark Streaming job which is taking so long. Here are some more details -
1.This Streaming job (where we are having an issue) reads data from bronze table and inserts that data into silver table.
2.I am using cdf mechanism, and currently the bronze data read out from read_stream has around 200 new unprocessed records
def process_cdc(batch_df, batch_id):
# if batch_df.isEmpty():
# return
batch_df.show()
print('haohaoprocess batch id: ', batch_id)
print(batch_df.count())
# Split insert/update vĂ delete
upserts_df = batch_df.filter(col("_change_type").isin("insert", "update_postimage"))
print('upserts_df.count(): ',upserts_df.count())
deletes_df = batch_df.filter(col("_change_type") == "delete")
print('deletes_df.count(): ',deletes_df.count())
# Clean data
cleaned_upserts = handle_data(upserts_df)
# Load Silver table
silver_table = DeltaTable.forPath(spark, silver_full_path)
# Upsert
silver_table.alias("silver").merge(
cleaned_upserts.alias("bronze"),
"silver.primary_key = bronze.primary_key"
).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
# Delete
silver_table.alias("silver").merge(
deletes_df.alias("bronze"),
"silver.primary_key = bronze.primary_key"
).whenMatchedDelete().execute()
query = (
spark.readStream
.format("delta")
.option("readChangeData", "true")
.option("startingVersion", starting_version)
.load(bronze_full_path)
.writeStream
.foreachBatch(process_cdc)
.option("checkpointLocation", "abfss://...")
.start()
)