cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch

hyedesign
New Contributor II

Hello, I am trying to write a simple upsert statement following the steps in the tutorials. here is what my code looks like:

from pyspark.sql import functions as F

def
upsert_source_one(self๐Ÿ˜ž
df_source = spark.readStream.format("delta").table(self.source_one_bronze_table)
df_targ_state = spark.read.format("delta").table(f"{self.silver_db}.state")
df_delta = (df_source
.join(df_targ_state, df_source.training_state == df_targ_state.id, "inner") # inner join data
.select(
F.col("badgenumber"),
F.col("name"),
F.col(f"{self.silver_db}.state.state_code").alias("training_state_id")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', '')) # clean up data
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)
ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", self.checkpointlocation)
)

streaming_query = ws.start()
streaming_query.awaitTermination(self.trigger_time_in_sec)
streaming_query.stop()

def upsertToDelta(microBatchOutputDF, batchId๐Ÿ˜ž
# Set the dataframe to view name
user_profile_df =
DeltaTable.forName(microBatchOutputDF.sparkSession(), f"{self.silver_db}.user_profile")
(
user_profile_df.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"badge_number": "br.badge_number",
"officer_name": "br.name",
"training_state_id": "br.training_state_id"
}
)
.execute()
)

I've shrunk the code a bit from what I have and the formatting is a bit off maybe.

I am running on a shared compute with a runtime of "15.0 (includes Apache Spark 3.5.0, Scala 2.12)". When I run this, it gets all the way to the .start(), however fails with SparkConnectGrpcException: (java.io.EOFException) error
I am not able to find what I'm doing wrong so any hints or suggestions would be helpful. My guess is it has something to do with the runtime I'm using, but don't want to just use an old version. 

Thanks!

6 REPLIES 6

hyedesign
New Contributor II

Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,

df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")
df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")
df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")
df_delta = (df_source
.join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")
.join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")
.select(F.col("badgenumber"),
F.col("name"), 
F.col("`cat1`.`silver`.state.id").alias("training_state_id"),
F.col("isVeteran").cast("boolean"),
F.col("cert_date").cast("date")
)
.withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))
.withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))
)

def upsertToDelta(microBatchOutputDF, batchId):
officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,
"`cat1`.`silver`.officer_profile")
(
officer_profile_df
.alias("up")
.merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")
.whenMatchedUpdate(
set={
"officer_name": "br.name",
"certification_date": "br.cert_date",
"isVeteran": "br.isVeteran",
"training_state_id": "br.training_state_id"
}
)
.whenNotMatchedInsert(
values={
"up.badge_number": "br.badgenumber",
"up.officer_name": "br.name",
"up.training_state_id": "br.training_state_id",
"up.isVeteran": "br.isVeteran",
"up.certification_date": "br.cert_date"
}
).execute()
)

ws = (df_delta.writeStream
.format("delta")
.foreachBatch(upsertToDelta)
.outputMode("update")
.option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")
)

streaming_query = ws.trigger(availableNow=True).start() # <~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)
print(f"waiting for 30 seconds")
streaming_query.awaitTermination(30)
print("Stopping trigger...")
streaming_query.stop()
print(f"Upsert Complete: {datetime.now()}")


hyedesign
New Contributor II

One more bit of information that I just figured out. This seems to be happening only when i run this via PyCharm. Not sure why it would be giving me this error though. Any help is greatly appreciated.

seans
New Contributor III

I'm getting this error also, in VS Code.

 

seans
New Contributor III

Was getting errors trying to include the code. Here is my eighth attempt:

for_each_batch_partial = partial(
    for_each_batch,
    spark=spark,
    environment=config.environment,
    kinesis_options=config.kinesis_options,
    mongo_options=config.mongo_options,
    mock_target=config.mock_target,
    collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
    log_level = config.log_level
)
query = (
    spark.readStream.format("kinesis")
    .options(**config.kinesis_options)
    .load()
    .writeStream.queryName("datapipe")
    .option("checkpointLocation", config.checkpoint_path)
    .foreachBatch(for_each_batch_partial)
    .start()
)

 

seans
New Contributor III

I figured out the error is hiding an underlying issue with the code, which you can get to if you deploy the bundle (if you are using asset bundles) and run from a notebook in a browser.

So the issue is more about the debugger not being able to stop on an exception thrown by the user function specified in the foreachbatch property of the streaming query.

seans
New Contributor III
Here is the full message
 

 

Exception has occurred: SparkConnectGrpcException
(java.io.IOException) Connection reset by peer
grpc._channel._MultiThreadedRendezvous: _MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.INTERNAL details = "Connection reset by peer" debug_error_string = "UNKNOWN:Error received from peer ipv4:44.234.192.44:443 {grpc_message:"Connection reset by peer", grpc_status:13, created_time:"2024-09-18T16:01:18.431172629+00:00"}" During handling of the above exception, another exception occurred: File "/workspaces/edge-datapipe-ods2/notebooks/edge_datapipe__main.py", line 108, in module .start() pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer

 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group