olivier-soucy
Contributor

I'm also trying to use the foreachBatch method of a Spark Streaming DataFrame with databricks-connect. Given that spark connect supported was added to  `foreachBatch` in 3.5.0, I was expecting this to work.

Configuration:
- DBR 15.4 (Spark 3.5.0)
- databricks-connect 15.4.2

Code:

import os
from databricks.connect import DatabricksSession

# Setup
spark = DatabricksSession.builder.clusterId("0501-011833-vcux5w7j").getOrCreate()

# Execute
df = spark.readStream.table("brz_stock_prices_job")

def update_metrics(batch_df, batch_id):
    size = batch_df.count()
    print(f"Batch size: {size}")

Error:

  File "/Users/osoucy/miniconda3/envs/lac/lib/python3.10/site-packages/pyspark/sql/connect/client/core.py", line 2149, in _handle_rpc_error
    raise convert_exception(
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (java.io.IOException) 

Connection reset by peer
JVM stacktrace:
java.io.IOException
	at sun.nio.ch.FileDispatcherImpl.read0(FileDispatcherImpl.java:-2)
	at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:39)
	at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
	at sun.nio.ch.IOUtil.read(IOUtil.java:197)
	at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
	at sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:208)
	at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:103)
	at java.io.BufferedInputStream.fill(BufferedInputStream.java:246)
	at java.io.BufferedInputStream.read(BufferedInputStream.java:265)
	at java.io.DataInputStream.readInt(DataInputStream.java:387)
	at org.apache.spark.api.python.StreamingPythonRunner.init(StreamingPythonRunner.scala:206)
	at org.apache.spark.sql.connect.planner.StreamingForeachBatchHelper$.$anonfun$pythonForeachBatchWrapper$3(StreamingForeachBatchHelper.scala:146)
        [...]
	at org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:561)
-> www.laktory.ai