olivier-soucy
Contributor

I'm experiencing the same issue.
Configuration:
- Cluster is on DBR 15.4 LTS (Spark 3.5.)
- Databricks-connect is version 15.4.2

Sample code:

import sys
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
   size = batch_df.count()
   
writer = df.writeStream.foreachBatch(update_metrics).start()

And the error that I get:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer

JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
	at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)

Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!

-> www.laktory.ai