Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,
df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")
df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")
df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")
df_delta = (df_source
.join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")
.join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")
.select(F.col("badgenumber"),
F.col("name"),
F.col("`cat1`.`silver`.state.id").alias("training_state_id"),
F.col("isVeteran").cast("boolean"),
F.col("cert_date").cast("date")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
def upsertToDelta(microBatchOutputDF, batchId):
officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,
"`cat1`.`silver`.officer_profile")
(
officer_profile_df
.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={
"officer_name": "br.name",
"certification_date": "br.cert_date",
"isVeteran": "br.isVeteran",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"up.badge_number": "br.badgenumber",
"up.officer_name": "br.name",
"up.training_state_id": "br.training_state_id",
"up.isVeteran": "br.isVeteran",
"up.certification_date": "br.cert_date"
}
).execute()
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")
)
streaming_query = ws.trigger(availableNow=True).start() # <~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)
print(f"waiting for 30 seconds")
streaming_query.awaitTermination(30)
print("Stopping trigger...")
streaming_query.stop()
print(f"Upsert Complete: {datetime.now()}")