cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch

hyedesign
New Contributor II

Hello, I am trying to write a simple upsert statement following the steps in the tutorials. here is what my code looks like:

from pyspark.sql import functions as F

def
upsert_source_one(self😞
df_source = spark.readStream.format("delta").table(self.source_one_bronze_table)
df_targ_state = spark.read.format("delta").table(f"{self.silver_db}.state")
df_delta = (df_source
.join(df_targ_state, df_source.training_state == df_targ_state.id, "inner") # inner join data
.select(
F.col("badgenumber"),
F.col("name"),
F.col(f"{self.silver_db}.state.state_code").alias("training_state_id")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', '')) # clean up data
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", self.checkpointlocation)
)

streaming_query = ws.start()
streaming_query.awaitTermination(self.trigger_time_in_sec)
streaming_query.stop()

def upsertToDelta(microBatchOutputDF, batchId😞
# Set the dataframe to view name
user_profile_df =
DeltaTable.forName(microBatchOutputDF.sparkSession(), f"{self.silver_db}.user_profile")
(
user_profile_df.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"badge_number": "br.badge_number",
"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.execute()
)

I've shrunk the code a bit from what I have and the formatting is a bit off maybe.

I am running on a shared compute with a runtime of "15.0 (includes Apache Spark 3.5.0, Scala 2.12)". When I run this, it gets all the way to the .start(), however fails with SparkConnectGrpcException: (java.io.EOFException) error
I am not able to find what I'm doing wrong so any hints or suggestions would be helpful. My guess is it has something to do with the runtime I'm using, but don't want to just use an old version. 

Thanks!

3 REPLIES 3

Kaniz
Community Manager
Community Manager

Hi @hyedesign

  • It seems your code snippet got cut off at the end. If you have more code, please provide the complete snippet.

hyedesign
New Contributor II

Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,

df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")
df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")
df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")
df_delta = (df_source
.join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")
.join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")
.select(F.col("badgenumber"),
F.col("name"), 
F.col("`cat1`.`silver`.state.id").alias("training_state_id"),
F.col("isVeteran").cast("boolean"),
F.col("cert_date").cast("date")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)

def upsertToDelta(microBatchOutputDF, batchId):
officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,
"`cat1`.`silver`.officer_profile")
(
officer_profile_df
.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={
"officer_name": "br.name",
"certification_date": "br.cert_date",
"isVeteran": "br.isVeteran",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"up.badge_number": "br.badgenumber",
"up.officer_name": "br.name",
"up.training_state_id": "br.training_state_id",
"up.isVeteran": "br.isVeteran",
"up.certification_date": "br.cert_date"
}
).execute()
)

ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")
)

streaming_query = ws.trigger(availableNow=True).start() # <~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)
print(f"waiting for 30 seconds")
streaming_query.awaitTermination(30)
print("Stopping trigger...")
streaming_query.stop()
print(f"Upsert Complete: {datetime.now()}")


hyedesign
New Contributor II

One more bit of information that I just figured out. This seems to be happening only when i run this via PyCharm. Not sure why it would be giving me this error though. Any help is greatly appreciated.