Error in Spark Streaming with foreachBatch and Databricks Connect
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-12-2024 10:40 PM
The following code throws an error locally in my IDE with Databricks-connect.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS sample")
spark.sql("DROP TABLE IF EXISTS sample.mvp")
spark.sql("DROP TABLE IF EXISTS sample.mvp_from_foreach_batch")
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Mike", "Johnson", 35)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "Age"])
df.write.format("delta").mode("overwrite").saveAsTable("sample.mvp")
def foreach_batch_function(df, epoch_id):
df.write.format("delta").mode("overwrite").saveAsTable(
"sample.mvp_from_foreach_batch"
)
spark.readStream.table("sample.mvp").writeStream.foreachBatch(
foreach_batch_function
).outputMode("append").trigger(availableNow=True).start().awaitTermination()
This code only works in notebooks or directly on a cluster. It will not run locally in an IDE with Databricks Connect.
Instead error
pyspark.errors.exceptions.connect.SparkException: No PYTHON_UID found for session (some uid) is raised
In general, Databricks Connect works fine for all other cases.
My local environment:
- databricks-connect 14.3.1
- databricks-sdk 0.26.0
- pyspark 3.5.1
- Python 3.11.4
Cluster running on
- 15.1 (includes Apache Spark 3.5.0, Scala 2.12)
- Single User Mode
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-21-2024 06:09 AM
Only things differs is Python 3.11.0 on Cluster vs. 3.11.4 locally. This shouldnt be an issue.
Does this code run for you?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-21-2024 10:21 PM
@Retired_mod Notebooks in Databricks Workspace are also working for me (this was never the problem)
Locally in VSCode with DataBricks Connect it fails
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-27-2024 10:28 PM
One more finding: It seems only to occur in single user cluster.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-30-2024 10:27 PM
This is still unresolved. Internally we have dropped streaming for now because of so many problems, another ticket with support is open.
Currently I do not recommend using streaming with foreach if you want to use databricks connect.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-22-2024 08:58 AM
I'm experiencing the same issue.
Configuration:
- Cluster is on DBR 15.4 LTS (Spark 3.5.)
- Databricks-connect is version 15.4.2
Sample code:
import sys
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
writer = df.writeStream.foreachBatch(update_metrics).start()
And the error that I get:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-22-2024 09:10 AM - edited 10-22-2024 09:13 AM
Hello!
I'm also running into the same issue. Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2
Trying to run this code:
import os
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
print(f"Batch size: {size}")
writer = df.writeStream.foreachBatch(update_metrics).start()
And I get this error:
Traceback (most recent call last):
[...]
packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException)
Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
Given the for each batch method is supported in spark connect since 3.5.0, I would assume this to work. Any help would be appreciated!
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
10-22-2024 09:55 AM
I'm also trying to use the foreachBatch method of a Spark Streaming DataFrame with databricks-connect. Given that spark connect supported was added to `foreachBatch` in 3.5.0, I was expecting this to work.
Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2
Code:
import os
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
print(f"Batch size: {size}")
Error:
File "/Users/osoucy/miniconda3/envs/lac/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException)
Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)

