Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-01-2024 10:58 AM
Hello, I am trying to write a simple upsert statement following the steps in the tutorials. here is what my code looks like:
from pyspark.sql import functions as F
def upsert_source_one(self😞
df_source = spark.readStream.format("delta").table(self.source_one_bronze_table)
df_targ_state = spark.read.format("delta").table(f"{self.silver_db}.state")
df_delta = (df_source
.join(df_targ_state, df_source.training_state == df_targ_state.id, "inner") # inner join data
.select(
F.col("badgenumber"),
F.col("name"),
F.col(f"{self.silver_db}.state.state_code").alias("training_state_id")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', '')) # clean up data
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", self.checkpointlocation)
)
streaming_query = ws.start()
streaming_query.awaitTermination(self.trigger_time_in_sec)
streaming_query.stop()
def upsertToDelta(microBatchOutputDF, batchId😞
# Set the dataframe to view name
user_profile_df =
DeltaTable.forName(microBatchOutputDF.sparkSession(), f"{self.silver_db}.user_profile")
(
user_profile_df.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"badge_number": "br.badge_number",
"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.execute()
)
I've shrunk the code a bit from what I have and the formatting is a bit off maybe.
I am running on a shared compute with a runtime of "15.0 (includes Apache Spark 3.5.0, Scala 2.12)". When I run this, it gets all the way to the .start(), however fails with SparkConnectGrpcException: (java.io.EOFException) error
I am not able to find what I'm doing wrong so any hints or suggestions would be helpful. My guess is it has something to do with the runtime I'm using, but don't want to just use an old version.
Thanks!
- Labels:
-
Delta Lake
-
Spark
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-02-2024 07:05 AM
Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,
df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")
df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")
df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")
df_delta = (df_source
.join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")
.join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")
.select(F.col("badgenumber"),
F.col("name"),
F.col("`cat1`.`silver`.state.id").alias("training_state_id"),
F.col("isVeteran").cast("boolean"),
F.col("cert_date").cast("date")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
def upsertToDelta(microBatchOutputDF, batchId):
officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,
"`cat1`.`silver`.officer_profile")
(
officer_profile_df
.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={
"officer_name": "br.name",
"certification_date": "br.cert_date",
"isVeteran": "br.isVeteran",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"up.badge_number": "br.badgenumber",
"up.officer_name": "br.name",
"up.training_state_id": "br.training_state_id",
"up.isVeteran": "br.isVeteran",
"up.certification_date": "br.cert_date"
}
).execute()
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")
)
streaming_query = ws.trigger(availableNow=True).start() # <~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)
print(f"waiting for 30 seconds")
streaming_query.awaitTermination(30)
print("Stopping trigger...")
streaming_query.stop()
print(f"Upsert Complete: {datetime.now()}")
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-02-2024 08:48 PM
One more bit of information that I just figured out. This seems to be happening only when i run this via PyCharm. Not sure why it would be giving me this error though. Any help is greatly appreciated.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2024 07:11 AM
I'm getting this error also, in VS Code.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2024 07:17 AM
Was getting errors trying to include the code. Here is my eighth attempt:
for_each_batch_partial = partial(
for_each_batch,
spark=spark,
environment=config.environment,
kinesis_options=config.kinesis_options,
mongo_options=config.mongo_options,
mock_target=config.mock_target,
collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
log_level = config.log_level
)
query = (
spark.readStream.format("kinesis")
.options(**config.kinesis_options)
.load()
.writeStream.queryName("datapipe")
.option("checkpointLocation", config.checkpoint_path)
.foreachBatch(for_each_batch_partial)
.start()
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-13-2024 07:52 AM
I figured out the error is hiding an underlying issue with the code, which you can get to if you deploy the bundle (if you are using asset bundles) and run from a notebook in a browser.
So the issue is more about the debugger not being able to stop on an exception thrown by the user function specified in the foreachbatch property of the streaming query.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-18-2024 09:07 AM
Exception has occurred: SparkConnectGrpcException
(java.io.IOException) Connection reset by peer
grpc._channel._MultiThreadedRendezvous: _MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.INTERNAL details = "Connection reset by peer" debug_error_string = "UNKNOWN:Error received from peer ipv4:44.234.192.44:443 {grpc_message:"Connection reset by peer", grpc_status:13, created_time:"2024-09-18T16:01:18.431172629+00:00"}" During handling of the above exception, another exception occurred: File "/workspaces/edge-datapipe-ods2/notebooks/edge_datapipe__main.py", line 108, in module .start() pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer

