05-12-2024 10:40 PM
The following code throws an error locally in my IDE with Databricks-connect.
from databricks.connect import DatabricksSession
spark = DatabricksSession.builder.getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS sample")
spark.sql("DROP TABLE IF EXISTS sample.mvp")
spark.sql("DROP TABLE IF EXISTS sample.mvp_from_foreach_batch")
data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Mike", "Johnson", 35)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "Age"])
df.write.format("delta").mode("overwrite").saveAsTable("sample.mvp")
def foreach_batch_function(df, epoch_id):
df.write.format("delta").mode("overwrite").saveAsTable(
"sample.mvp_from_foreach_batch"
)
spark.readStream.table("sample.mvp").writeStream.foreachBatch(
foreach_batch_function
).outputMode("append").trigger(availableNow=True).start().awaitTermination()
This code only works in notebooks or directly on a cluster. It will not run locally in an IDE with Databricks Connect.
Instead error
pyspark.errors.exceptions.connect.SparkException: No PYTHON_UID found for session (some uid) is raised
In general, Databricks Connect works fine for all other cases.
My local environment:
Cluster running on
05-21-2024 06:09 AM
Only things differs is Python 3.11.0 on Cluster vs. 3.11.4 locally. This shouldnt be an issue.
Does this code run for you?
05-21-2024 10:21 PM
@Retired_mod Notebooks in Databricks Workspace are also working for me (this was never the problem)
Locally in VSCode with DataBricks Connect it fails
05-27-2024 10:28 PM
One more finding: It seems only to occur in single user cluster.
05-30-2024 10:27 PM
This is still unresolved. Internally we have dropped streaming for now because of so many problems, another ticket with support is open.
Currently I do not recommend using streaming with foreach if you want to use databricks connect.
10-22-2024 08:58 AM
I'm experiencing the same issue.
Configuration:
- Cluster is on DBR 15.4 LTS (Spark 3.5.)
- Databricks-connect is version 15.4.2
Sample code:
import sys
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
writer = df.writeStream.foreachBatch(update_metrics).start()
And the error that I get:
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!
10-22-2024 09:10 AM - edited 10-22-2024 09:13 AM
Hello!
I'm also running into the same issue. Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2
Trying to run this code:
import os
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
print(f"Batch size: {size}")
writer = df.writeStream.foreachBatch(update_metrics).start()
And I get this error:
Traceback (most recent call last):
[...]
packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException)
Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
at scala.util.Using$.resource(Using.scala:269)
at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
Given the for each batch method is supported in spark connect since 3.5.0, I would assume this to work. Any help would be appreciated!
10-22-2024 09:55 AM
I'm also trying to use the foreachBatch method of a Spark Streaming DataFrame with databricks-connect. Given that spark connect supported was added to `foreachBatch` in 3.5.0, I was expecting this to work.
Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2
Code:
import os
from databricks.connect import DatabricksSession
# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()
# Execute
df = spark.readStream.table("brz_stock_prices_job")
def update_metrics(batch_df, batch_id):
size = batch_df.count()
print(f"Batch size: {size}")
Error:
File "/Users/osoucy/miniconda3/envs/lac/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException)
Connection reset by peer
JVM stacktrace:
java.io.IOException
at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group