<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Error in Spark Streaming with foreachBatch and Databricks Connect in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70216#M34031</link>
    <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;Notebooks in Databricks Workspace are also working for me (this was never the problem)&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Locally in VSCode with DataBricks Connect it fails&lt;/STRONG&gt;&lt;/P&gt;</description>
    <pubDate>Wed, 22 May 2024 05:21:55 GMT</pubDate>
    <dc:creator>TWib</dc:creator>
    <dc:date>2024-05-22T05:21:55Z</dc:date>
    <item>
      <title>Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/68843#M33755</link>
      <description>&lt;P&gt;The following code throws an error locally in my IDE with Databricks-connect.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS sample")
spark.sql("DROP TABLE IF EXISTS sample.mvp")
spark.sql("DROP TABLE IF EXISTS sample.mvp_from_foreach_batch")

data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Mike", "Johnson", 35)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "Age"])
df.write.format("delta").mode("overwrite").saveAsTable("sample.mvp")

def foreach_batch_function(df, epoch_id):
    df.write.format("delta").mode("overwrite").saveAsTable(
        "sample.mvp_from_foreach_batch"
    )

spark.readStream.table("sample.mvp").writeStream.foreachBatch(
    foreach_batch_function
).outputMode("append").trigger(availableNow=True).start().awaitTermination()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;This code only works in notebooks or directly on a cluster. It will not run locally in an IDE with Databricks Connect.&lt;/P&gt;&lt;P&gt;Instead error&lt;/P&gt;&lt;BLOCKQUOTE&gt;&lt;P&gt;pyspark.errors.exceptions.connect.SparkException: No PYTHON_UID found for session (some uid) is raised&lt;/P&gt;&lt;/BLOCKQUOTE&gt;&lt;P&gt;In general, Databricks Connect works fine for all other cases.&lt;/P&gt;&lt;P&gt;My local environment:&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;databricks-connect 14.3.1&lt;/LI&gt;&lt;LI&gt;databricks-sdk 0.26.0&lt;/LI&gt;&lt;LI&gt;pyspark 3.5.1&lt;/LI&gt;&lt;LI&gt;Python 3.11.4&lt;/LI&gt;&lt;/UL&gt;&lt;P&gt;Cluster running on&lt;/P&gt;&lt;UL&gt;&lt;LI&gt;&lt;SPAN&gt;15.1 (includes Apache Spark 3.5.0, Scala 2.12)&lt;/SPAN&gt;&lt;/LI&gt;&lt;LI&gt;&lt;SPAN&gt;Single User Mode&lt;/SPAN&gt;&lt;/LI&gt;&lt;/UL&gt;</description>
      <pubDate>Mon, 13 May 2024 05:40:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/68843#M33755</guid>
      <dc:creator>TWib</dc:creator>
      <dc:date>2024-05-13T05:40:15Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70117#M34001</link>
      <description>&lt;P&gt;Only things differs is Python 3.11.0 on Cluster vs. 3.11.4 locally. This shouldnt be an issue.&lt;/P&gt;&lt;P&gt;Does this code run for you?&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 21 May 2024 13:09:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70117#M34001</guid>
      <dc:creator>TWib</dc:creator>
      <dc:date>2024-05-21T13:09:52Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70216#M34031</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;Notebooks in Databricks Workspace are also working for me (this was never the problem)&lt;/P&gt;&lt;P&gt;&lt;STRONG&gt;Locally in VSCode with DataBricks Connect it fails&lt;/STRONG&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 22 May 2024 05:21:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70216#M34031</guid>
      <dc:creator>TWib</dc:creator>
      <dc:date>2024-05-22T05:21:55Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70843#M34164</link>
      <description>&lt;P&gt;One more finding: It seems only to occur in single user cluster.&lt;/P&gt;</description>
      <pubDate>Tue, 28 May 2024 05:28:39 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/70843#M34164</guid>
      <dc:creator>TWib</dc:creator>
      <dc:date>2024-05-28T05:28:39Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/71130#M34250</link>
      <description>&lt;P&gt;This is still unresolved. Internally we have dropped streaming for now because of so many problems, another ticket with support is open.&lt;/P&gt;&lt;P&gt;Currently I do not recommend using streaming with foreach if you want to use databricks connect.&lt;/P&gt;</description>
      <pubDate>Fri, 31 May 2024 05:27:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/71130#M34250</guid>
      <dc:creator>TWib</dc:creator>
      <dc:date>2024-05-31T05:27:07Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95538#M39118</link>
      <description>&lt;P&gt;I'm experiencing the same issue.&lt;BR /&gt;Configuration:&lt;BR /&gt;- Cluster is on DBR 15.4 LTS (Spark 3.5.)&lt;BR /&gt;- Databricks-connect is version 15.4.2&lt;BR /&gt;&lt;BR /&gt;Sample code:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import sys
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
   size = batch_df.count()
   
writer = df.writeStream.foreachBatch(update_metrics).start()&lt;/LI-CODE&gt;&lt;P&gt;And the error that I get:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer

JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
	at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)&lt;/LI-CODE&gt;&lt;P&gt;Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!&lt;/P&gt;</description>
      <pubDate>Tue, 22 Oct 2024 15:58:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95538#M39118</guid>
      <dc:creator>olivier-soucy</dc:creator>
      <dc:date>2024-10-22T15:58:07Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95540#M39119</link>
      <description>&lt;P&gt;Hello!&lt;BR /&gt;&lt;BR /&gt;I'm also running into the same issue. Configuration:&lt;BR /&gt;- DBR 15.4 (Spark 3.5.0)&lt;BR /&gt;- databricks-connect 15.4.2&lt;/P&gt;&lt;P&gt;Trying to run this code:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import os
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
    size = batch_df.count()
    print(f"Batch size: {size}")

writer = df.writeStream.foreachBatch(update_metrics).start()&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;And I get this error:&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;Traceback (most recent call last):
[...]
packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) 

Connection reset by peer
JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Given the for each batch method is supported in spark connect since 3.5.0, I would assume this to work. Any help would be appreciated!&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 22 Oct 2024 16:13:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95540#M39119</guid>
      <dc:creator>olivier-soucy</dc:creator>
      <dc:date>2024-10-22T16:13:33Z</dc:date>
    </item>
    <item>
      <title>Re: Error in Spark Streaming with foreachBatch and Databricks Connect</title>
      <link>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95566#M39124</link>
      <description>&lt;P&gt;I'm also trying to use the foreachBatch method of a Spark Streaming DataFrame with databricks-connect. Given that spark connect supported was added to &amp;nbsp;`foreachBatch` in 3.5.0, I was expecting this to work.&lt;/P&gt;&lt;P&gt;Configuration:&lt;BR /&gt;- DBR 15.4 (Spark 3.5.0)&lt;BR /&gt;- databricks-connect 15.4.2&lt;/P&gt;&lt;P&gt;Code:&lt;/P&gt;&lt;LI-CODE lang="python"&gt;import os
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
    size = batch_df.count()
    print(f"Batch size: {size}")&lt;/LI-CODE&gt;&lt;P&gt;Error:&lt;/P&gt;&lt;LI-CODE lang="markup"&gt;  File "/Users/osoucy/miniconda3/envs/lac/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) 

Connection reset by peer
JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
        [...]
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)&lt;/LI-CODE&gt;</description>
      <pubDate>Tue, 22 Oct 2024 16:55:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/error-in-spark-streaming-with-foreachbatch-and-databricks/m-p/95566#M39124</guid>
      <dc:creator>olivier-soucy</dc:creator>
      <dc:date>2024-10-22T16:55:21Z</dc:date>
    </item>
  </channel>
</rss>

