<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/90943#M38037</link>
    <description>&lt;DIV class=""&gt;&lt;DIV class=""&gt;Here is the full message&lt;/DIV&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Exception has occurred: SparkConnectGrpcException
(java.io.IOException) Connection reset by peer
grpc._channel._MultiThreadedRendezvous: _MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.INTERNAL details = "Connection reset by peer" debug_error_string = "UNKNOWN:Error received from peer ipv4:44.234.192.44:443 {grpc_message:"Connection reset by peer", grpc_status:13, created_time:"2024-09-18T16:01:18.431172629+00:00"}" During handling of the above exception, another exception occurred: File "/workspaces/edge-datapipe-ods2/notebooks/edge_datapipe__main.py", line 108, in module .start() pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 18 Sep 2024 16:07:50 GMT</pubDate>
    <dc:creator>seans</dc:creator>
    <dc:date>2024-09-18T16:07:50Z</dc:date>
    <item>
      <title>Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65207#M32751</link>
      <description>&lt;P&gt;Hello, I am trying to write a simple upsert statement following the steps in the tutorials. here is what my code looks like:&lt;/P&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;from pyspark.sql import functions as F&lt;BR /&gt;&lt;BR /&gt;def &lt;/SPAN&gt;&lt;SPAN&gt;upsert_source_one&lt;/SPAN&gt;(&lt;SPAN&gt;self&lt;/SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;BR /&gt;    df_source = spark.readStream.format("delta").table(self.source_one_bronze_table)&lt;BR /&gt;    df_targ_state = spark.read.format("delta").table(f"{self.silver_db}.state")&lt;BR /&gt;    df_delta = (df_source&lt;BR /&gt;                .join(df_targ_state, df_source.training_state == df_targ_state.id, "inner")  # inner join data&lt;BR /&gt;                 .select(&lt;BR /&gt;                   F.col("badgenumber"),&lt;BR /&gt;                   F.col("name"),&lt;BR /&gt;                   F.col(f"{self.silver_db}.state.state_code").alias("training_state_id")&lt;BR /&gt;                 )&lt;BR /&gt;                 .withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))  # clean up data&lt;BR /&gt;                 .withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))&lt;BR /&gt;                )&lt;BR /&gt;ws = (df_delta.writeStream&lt;BR /&gt;              .format("delta")&lt;BR /&gt;              .foreachBatch(upsertToDelta) &lt;BR /&gt;              .outputMode("update")&lt;BR /&gt;              .option("checkpointLocation", self.checkpointlocation)&lt;BR /&gt;      )&lt;BR /&gt;&lt;BR /&gt;streaming_query = ws.start()&lt;BR /&gt;streaming_query.awaitTermination(self.trigger_time_in_sec)&lt;BR /&gt;streaming_query.stop()&lt;BR /&gt;&lt;BR /&gt;&lt;/PRE&gt;&lt;DIV&gt;&lt;PRE&gt;&lt;SPAN&gt;def &lt;/SPAN&gt;&lt;SPAN&gt;upsertToDelta&lt;/SPAN&gt;(microBatchOutputDF&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;batchId&lt;/SPAN&gt;&lt;span class="lia-unicode-emoji" title=":disappointed_face:"&gt;😞&lt;/span&gt;&lt;BR /&gt;    &lt;SPAN&gt;# Set the dataframe to view name&lt;BR /&gt;&lt;/SPAN&gt;    user_profile_df = &lt;BR /&gt;      DeltaTable.forName(microBatchOutputDF.sparkSession()&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;f"&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;self.silver_db&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;.user_profile"&lt;/SPAN&gt;)&lt;BR /&gt;    (&lt;BR /&gt;        user_profile_df.alias(&lt;SPAN&gt;"up"&lt;/SPAN&gt;)&lt;BR /&gt;        .merge(microBatchOutputDF.alias(&lt;SPAN&gt;"br"&lt;/SPAN&gt;)&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;SPAN&gt;"up.badge_number = br.badgenumber"&lt;/SPAN&gt;)&lt;BR /&gt;        .whenMatchedUpdate(&lt;BR /&gt;            &lt;SPAN&gt;set&lt;/SPAN&gt;={&lt;SPAN&gt;"officer_name"&lt;/SPAN&gt;: &lt;SPAN&gt;"br.name"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;                 &lt;SPAN&gt;"training_state_id"&lt;/SPAN&gt;: &lt;SPAN&gt;"br.training_state_id"&lt;BR /&gt;&lt;/SPAN&gt;                 }&lt;BR /&gt;        )&lt;BR /&gt;        .whenNotMatchedInsert(&lt;BR /&gt;            &lt;SPAN&gt;values&lt;/SPAN&gt;={&lt;BR /&gt;                &lt;SPAN&gt;"badge_number"&lt;/SPAN&gt;: &lt;SPAN&gt;"br.badge_number"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;BR /&gt;&lt;/SPAN&gt;                &lt;SPAN&gt;"officer_name"&lt;/SPAN&gt;: &lt;SPAN&gt;"br.name"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;BR /&gt;&lt;/SPAN&gt;                &lt;SPAN&gt;"training_state_id"&lt;/SPAN&gt;: &lt;SPAN&gt;"br.training_state_id"&lt;/SPAN&gt;&lt;SPAN&gt;&lt;BR /&gt;&lt;/SPAN&gt;            }&lt;BR /&gt;        )&lt;BR /&gt;        .execute()&lt;BR /&gt;    )&lt;/PRE&gt;&lt;P&gt;I've shrunk the code a bit from what I have and the formatting is a bit off maybe.&lt;/P&gt;&lt;P&gt;I am running on a shared compute with a runtime of "&lt;SPAN&gt;15.0 (includes Apache Spark 3.5.0, Scala 2.12)". When I run this, it gets all the way to the .start(), however fails with&amp;nbsp;SparkConnectGrpcException: (java.io.EOFException) error&lt;BR /&gt;I am not able to find what I'm doing wrong so any hints or suggestions would be helpful. My guess is it has something to do with the runtime I'm using, but don't want to just use an old version.&amp;nbsp;&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Thanks!&lt;/SPAN&gt;&lt;/P&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Mon, 01 Apr 2024 17:58:31 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65207#M32751</guid>
      <dc:creator>hyedesign</dc:creator>
      <dc:date>2024-04-01T17:58:31Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65312#M32781</link>
      <description>&lt;P&gt;Using sample data sets. Here is the full code. This error does seem to be related to runtime version 15,&lt;/P&gt;&lt;PRE&gt;df_source = spark.readStream.format("delta").table("`cat1`.`bronze`.`officer_info`")&lt;BR /&gt;df_orig_state = spark.read.format("delta").table("`sample-db`.`public`.state")&lt;BR /&gt;df_targ_state = spark.read.format("delta").table("`cat1`.`silver`.state")&lt;BR /&gt;df_delta = (df_source&lt;BR /&gt;            .join(df_orig_state, df_source.training_state == df_orig_state.id, "inner")&lt;BR /&gt;            .join(df_targ_state, df_orig_state.state_code == df_targ_state.state_code, "inner")&lt;BR /&gt;            .select(F.col("badgenumber"),&lt;BR /&gt;                    F.col("name"),&amp;nbsp;&lt;BR /&gt;                    F.col("`cat1`.`silver`.state.id").alias("training_state_id"),&lt;BR /&gt;                    F.col("isVeteran").cast("boolean"),&lt;BR /&gt;                    F.col("cert_date").cast("date")&lt;BR /&gt;            )&lt;BR /&gt;            .withColumn("badgenumber", F.regexp_replace('badgenumber', '-', ''))&lt;BR /&gt;            .withColumn("badgenumber", F.regexp_replace('badgenumber', ' ', ''))&lt;BR /&gt;           )&lt;BR /&gt;&lt;BR /&gt;def upsertToDelta(microBatchOutputDF, batchId):&lt;BR /&gt;           officer_profile_df = DeltaTable.forName(microBatchOutputDF.sparkSession,&lt;BR /&gt;"`cat1`.`silver`.officer_profile")&lt;BR /&gt;           ( &lt;BR /&gt;             officer_profile_df&lt;BR /&gt;                 .alias("up")&lt;BR /&gt;                 .merge(microBatchOutputDF.alias("br"), "up.badge_number = br.badgenumber")&lt;BR /&gt;                 .whenMatchedUpdate(&lt;BR /&gt;                   set={&lt;BR /&gt;                        "officer_name": "br.name",&lt;BR /&gt;                        "certification_date": "br.cert_date",&lt;BR /&gt;                        "isVeteran": "br.isVeteran",&lt;BR /&gt;                        "training_state_id": "br.training_state_id"&lt;BR /&gt;                       }&lt;BR /&gt;                 )&lt;BR /&gt;                 .whenNotMatchedInsert(&lt;BR /&gt;                   values={&lt;BR /&gt;                           "up.badge_number": "br.badgenumber",&lt;BR /&gt;                           "up.officer_name": "br.name",&lt;BR /&gt;                           "up.training_state_id": "br.training_state_id",&lt;BR /&gt;                           "up.isVeteran": "br.isVeteran",&lt;BR /&gt;                           "up.certification_date": "br.cert_date"&lt;BR /&gt;                          }&lt;BR /&gt;                 ).execute()&lt;BR /&gt;            )&lt;BR /&gt;&lt;BR /&gt;ws = (df_delta.writeStream&lt;BR /&gt;              .format("delta")&lt;BR /&gt;              .foreachBatch(upsertToDelta)&lt;BR /&gt;              .outputMode("update")&lt;BR /&gt;              .option("checkpointLocation", "s3://somebucket/checkpoint/silver/source_one_test1")&lt;BR /&gt;)&lt;BR /&gt;&lt;BR /&gt;streaming_query = ws.trigger(availableNow=True).start()  # &amp;lt;~~~~ FAILS HERE on start() with pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.EOFException)&lt;BR /&gt;print(f"waiting for 30 seconds")&lt;BR /&gt;streaming_query.awaitTermination(30)&lt;BR /&gt;print("Stopping trigger...")&lt;BR /&gt;streaming_query.stop()&lt;BR /&gt;print(f"Upsert Complete: {datetime.now()}")&lt;BR /&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/PRE&gt;</description>
      <pubDate>Tue, 02 Apr 2024 14:05:30 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65312#M32781</guid>
      <dc:creator>hyedesign</dc:creator>
      <dc:date>2024-04-02T14:05:30Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65359#M32790</link>
      <description>&lt;P&gt;One more bit of information that I just figured out. This seems to be happening only when i run this via PyCharm. Not sure why it would be giving me this error though. Any help is greatly appreciated.&lt;/P&gt;</description>
      <pubDate>Wed, 03 Apr 2024 03:48:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/65359#M32790</guid>
      <dc:creator>hyedesign</dc:creator>
      <dc:date>2024-04-03T03:48:06Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89802#M37908</link>
      <description>&lt;P&gt;I'm getting this error also, in VS Code.&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Fri, 13 Sep 2024 14:11:43 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89802#M37908</guid>
      <dc:creator>seans</dc:creator>
      <dc:date>2024-09-13T14:11:43Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89804#M37909</link>
      <description>&lt;P&gt;Was getting errors trying to include the code. Here is my eighth attempt:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;for_each_batch_partial = partial(
    for_each_batch,
    spark=spark,
    environment=config.environment,
    kinesis_options=config.kinesis_options,
    mongo_options=config.mongo_options,
    mock_target=config.mock_target,
    collection_schemas=create_collection_schema(spark, EVENT_TO_COLLECTION),
    log_level = config.log_level
)
query = (
    spark.readStream.format("kinesis")
    .options(**config.kinesis_options)
    .load()
    .writeStream.queryName("datapipe")
    .option("checkpointLocation", config.checkpoint_path)
    .foreachBatch(for_each_batch_partial)
    .start()
)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 13 Sep 2024 14:17:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89804#M37909</guid>
      <dc:creator>seans</dc:creator>
      <dc:date>2024-09-13T14:17:50Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89808#M37910</link>
      <description>&lt;P&gt;I figured out the error is hiding an underlying issue with the code, which you can get to if you deploy the bundle (if you are using asset bundles) and run from a notebook in a browser.&lt;/P&gt;&lt;P&gt;So the issue is more about the debugger not being able to stop on an exception thrown by the user function specified in the foreachbatch property of the streaming query.&lt;/P&gt;</description>
      <pubDate>Fri, 13 Sep 2024 14:52:24 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/89808#M37910</guid>
      <dc:creator>seans</dc:creator>
      <dc:date>2024-09-13T14:52:24Z</dc:date>
    </item>
    <item>
      <title>Re: Getting SparkConnectGrpcException: (java.io.EOFException) error when using foreachBatch</title>
      <link>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/90943#M38037</link>
      <description>&lt;DIV class=""&gt;&lt;DIV class=""&gt;Here is the full message&lt;/DIV&gt;&lt;DIV class=""&gt;&amp;nbsp;&lt;/DIV&gt;&lt;/DIV&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;Exception has occurred: SparkConnectGrpcException
(java.io.IOException) Connection reset by peer
grpc._channel._MultiThreadedRendezvous: _MultiThreadedRendezvous of RPC that terminated with: status = StatusCode.INTERNAL details = "Connection reset by peer" debug_error_string = "UNKNOWN:Error received from peer ipv4:44.234.192.44:443 {grpc_message:"Connection reset by peer", grpc_status:13, created_time:"2024-09-18T16:01:18.431172629+00:00"}" During handling of the above exception, another exception occurred: File "/workspaces/edge-datapipe-ods2/notebooks/edge_datapipe__main.py", line 108, in module .start() pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 18 Sep 2024 16:07:50 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/getting-sparkconnectgrpcexception-java-io-eofexception-error/m-p/90943#M38037</guid>
      <dc:creator>seans</dc:creator>
      <dc:date>2024-09-18T16:07:50Z</dc:date>
    </item>
  </channel>
</rss>

