Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-22-2024 08:58 AM
I'm experiencing the same issue.
Configuration:
- Cluster is on DBR 15.4 LTS (Spark 3.5.)
- Databricks-connect is version 15.4.2
Sample code:
import sys
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
writer = df.writeStream.foreachBatch(update_metrics).start()And the error that I get:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!
-> www.laktory.ai