cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Error in Spark Streaming with foreachBatch and Databricks Connect

TWib
New Contributor III

The following code throws an error locally in my IDE with Databricks-connect.

 

 

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.getOrCreate()
spark.sql("CREATE DATABASE IF NOT EXISTS sample")
spark.sql("DROP TABLE IF EXISTS sample.mvp")
spark.sql("DROP TABLE IF EXISTS sample.mvp_from_foreach_batch")

data = [("John", "Doe", 30), ("Jane", "Doe", 25), ("Mike", "Johnson", 35)]
df = spark.createDataFrame(data, ["FirstName", "LastName", "Age"])
df.write.format("delta").mode("overwrite").saveAsTable("sample.mvp")

def foreach_batch_function(df, epoch_id):
    df.write.format("delta").mode("overwrite").saveAsTable(
        "sample.mvp_from_foreach_batch"
    )

spark.readStream.table("sample.mvp").writeStream.foreachBatch(
    foreach_batch_function
).outputMode("append").trigger(availableNow=True).start().awaitTermination()

 

 

This code only works in notebooks or directly on a cluster. It will not run locally in an IDE with Databricks Connect.

Instead error

pyspark.errors.exceptions.connect.SparkException: No PYTHON_UID found for session (some uid) is raised

In general, Databricks Connect works fine for all other cases.

My local environment:

  • databricks-connect 14.3.1
  • databricks-sdk 0.26.0
  • pyspark 3.5.1
  • Python 3.11.4

Cluster running on

  • 15.1 (includes Apache Spark 3.5.0, Scala 2.12)
  • Single User Mode
7 REPLIES 7

TWib
New Contributor III

Only things differs is Python 3.11.0 on Cluster vs. 3.11.4 locally. This shouldnt be an issue.

Does this code run for you?

 

TWib
New Contributor III

@Retired_mod Notebooks in Databricks Workspace are also working for me (this was never the problem)

Locally in VSCode with DataBricks Connect it fails

TWib
New Contributor III

One more finding: It seems only to occur in single user cluster.

TWib
New Contributor III

This is still unresolved. Internally we have dropped streaming for now because of so many problems, another ticket with support is open.

Currently I do not recommend using streaming with foreach if you want to use databricks connect.

olivier-soucy
Contributor

I'm experiencing the same issue.
Configuration:
- Cluster is on DBR 15.4 LTS (Spark 3.5.)
- Databricks-connect is version 15.4.2

Sample code:

import sys
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
   size = batch_df.count()
   
writer = df.writeStream.foreachBatch(update_metrics).start()

And the error that I get:

pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) Connection reset by peer

JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
	at com.databricks.spark.connect.service.LocalCredentialsCache$.cacheCredentialsAndRun(LocalCredentialsCache.scala:226)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.pythonForeachBatchWrapper(StreamingForeachBatchHelper.scala:133)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.handleWriteStreamOperationStart(SparkConnectPlanner.scala:3492)
	at org.apache.spark.sql.connect.planner.SparkConnectPlanner.process(SparkConnectPlanner.scala:2853)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.handleCommand(ExecuteThreadRunner.scala:336)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:245)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:341)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:1180)
	at org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:341)
	at org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:97)
	at org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:84)
	at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:239)
	at org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:83)
	at org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:340)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:178)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:128)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.$anonfun$run$2(ExecuteThreadRunner.scala:562)
	at com.databricks.unity.UCSEphemeralState$Handle.runWith(UCSEphemeralState.scala:51)
	at com.databricks.unity.HandleImpl.runWith(UCSHandle.scala:104)
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)

Given that foreachBatch function added support for spark connect in 3.5.0, I would expect this to work. Any help would be appreciated!

-> www.laktory.ai

olivier-soucy
Contributor

Hello!

I'm also running into the same issue. Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2

Trying to run this code:

 

 

import os
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
    size = batch_df.count()
    print(f"Batch size: {size}")

writer = df.writeStream.foreachBatch(update_metrics).start()

 

 

And I get this error:

 

 

Traceback (most recent call last):
[...]
packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) 

Connection reset by peer
JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
[...]
	at com.databricks.unity.HandleImpl.$anonfun$runWithAndClose$1(UCSHandle.scala:109)
	at scala.util.Using$.resource(Using.scala:269)
	at com.databricks.unity.HandleImpl.runWithAndClose(UCSHandle.scala:108)
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)

 

 

Given the for each batch method is supported in spark connect since 3.5.0, I would assume this to work. Any help would be appreciated! 

-> www.laktory.ai

olivier-soucy
Contributor

I'm also trying to use the foreachBatch method of a Spark Streaming DataFrame with databricks-connect. Given that spark connect supported was added to  `foreachBatch` in 3.5.0, I was expecting this to work.

Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2

Code:

import os
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
    size = batch_df.count()
    print(f"Batch size: {size}")

Error:

  File "/Users/osoucy/miniconda3/envs/lac/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) 

Connection reset by peer
JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
        [...]
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
-> www.laktory.ai

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group