cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

StreamCorruptedException, databricks-connect 9.1

JordiDekker
New Contributor III

Last week, around the 21st of march, we started having issues with databricks-connect (DBR 9.1 LTS). "databricks-connect test" works, but the following code snippet:

from pyspark.sql import SparkSession
 
 
spark = SparkSession.builder.getOrCreate()
sparkcontext = spark.sparkContext
 
rdd1 = sparkcontext.parallelize(range(10), 10)
rdd2 = rdd1.map(lambda x: x + 1)
sum2 = rdd2.sum()
 
assert sum2 == 55

fails with error:

log4j:WARN No appenders could be found for logger (org.apache.spark.util.Utils).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
View job details at https://[redacted].azuredatabricks.net/?o=[redacted]#/setting/clusters/[redacted]/sparkUi
Traceback (most recent call last):
  File "main.py", line 9, in <module>
    sum2 = rdd2.sum()
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/pyspark/rdd.py", line 1259, in sum
    return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/pyspark/rdd.py", line 1113, in fold
    vals = self.mapPartitions(func).collect()
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/pyspark/rdd.py", line 967, in collect
    sock_info = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1304, in __call__
    return_value = get_return_value(
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/pyspark/sql/utils.py", line 117, in deco
    return f(*a, **kw)
  File "/home/[redacted]/dev/debug-databricks-connect/.venv/lib/python3.8/site-packages/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.collectAndServe.
: java.io.StreamCorruptedException: invalid type code: 01
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1700)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at java.io.ObjectInputStream.readArray(ObjectInputStream.java:2119)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1657)
        at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2431)
        at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2355)
        at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2213)
        at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1669)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
        at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
        at org.apache.spark.sql.util.ProtoSerializer.$anonfun$deserializeObject$1(ProtoSerializer.scala:6631)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at org.apache.spark.sql.util.ProtoSerializer.deserializeObject(ProtoSerializer.scala:6616)
        at com.databricks.service.SparkServiceRPCHandler.execute0(SparkServiceRPCHandler.scala:728)
        at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC0$1(SparkServiceRPCHandler.scala:477)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.service.SparkServiceRPCHandler.executeRPC0(SparkServiceRPCHandler.scala:372)
        at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:323)
        at com.databricks.service.SparkServiceRPCHandler$$anon$2.call(SparkServiceRPCHandler.scala:309)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at com.databricks.service.SparkServiceRPCHandler.$anonfun$executeRPC$1(SparkServiceRPCHandler.scala:359)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
        at com.databricks.service.SparkServiceRPCHandler.executeRPC(SparkServiceRPCHandler.scala:336)
        at com.databricks.service.SparkServiceRPCServlet.doPost(SparkServiceRPCServer.scala:167)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
        at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
        at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:799)
        at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:550)
        at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:190)
        at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
        at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
        at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
        at org.eclipse.jetty.server.Server.handle(Server.java:516)
        at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388)
        at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633)
        at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380)
        at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:277)
        at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
        at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
        at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:338)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:315)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:173)
        at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:131)
        at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:383)
        at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:882)
        at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:1036)
        at java.lang.Thread.run(Thread.java:750)

the python environment:

[tool.poetry]
name = "dev"
version = "0.1.0"
description = ""
authors = ["John Doe"]
 
 
[tool.poetry.dependencies]
python = "^3.8"
databricks-connect = ">=9.1.0,<9.2.0"
 
 
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"

As nothing changed on our side I would appreciate any help.

5 REPLIES 5

JordiDekker
New Contributor III

Update: the snippet runs fine on an 11.3LTS cluster (and dbc 11.3.* of course).

Is this a regression in 9.1? Could it have something to do with the log4j -> reload4j replacement in DBR <= 10.4?

Aviral-Bhardwaj
Esteemed Contributor III

An error occurred while calling o390.createRDDFromTrustedPath. Trace: py4j.security.Py4JSecurityException: Method public org.apache.spark.api.java.JavaRDD org.apache.spark.sql.SparkSession.createRDDFromTrustedPath(java.lang.String,int) is not whitelisted on class class org.apache.spark.sql.SparkSessi

I am getting above error, how you are running this

AviralBhardwaj

In this case locally, in a clean poetry environment.

Anonymous
Not applicable

Hi @Jordi Dekker​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Hi @Vidula Khanna​,

No answer has been given yet

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group