cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Re-establish SparkSession using Databricks connect after cluster restart

MarkusFra
New Contributor III

Hello,

when developing locally using Databricks connect how do I re-establish the SparkSession when the Cluster restarted? getOrCreate() seems to get the old invalid SparkSession even after Cluster restart instead of creating a new one or am I missing something?

Before Cluster restart everything works fine:

 

 

>> spark = DatabricksSession.builder.getOrCreate()
DEBUG:databricks.connect:IPython module is present.
DEBUG:databricks.connect:Falling back to default configuration from the SDK.
INFO:databricks.sdk:loading DEFAULT profile from ~/.databrickscfg: host, token, cluster_id
DEBUG:databricks.sdk:Attempting to configure auth: pat
DEBUG:databricks.connect:Creating SparkSession from SDK config: <Config: host=https://adb-**************.**.azuredatabricks.net, token=***, auth_type=pat, cluster_id=****-******-********>
DEBUG:databricks.connect:Validating configuration by using the Databricks SDK
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): adb-6130442328907134.14.azuredatabricks.net:443
DEBUG:urllib3.connectionpool:https://adb-*******************.**.azuredatabricks.net:443 "GET /api/2.0/clusters/get?cluster_id=****-******-******** HTTP/1.1" 200 None
DEBUG:databricks.sdk:GET /api/2.0/clusters/get?cluster_id=****-******-********
< 200 OK
< {
<<< REDACTED: long message with api response >>>
< }
DEBUG:databricks.connect:Session validated successfully.

>> spark.sql("SELECT now()")
Out[7]: DataFrame[now(): timestamp]

 

 

After restart of the cluster:

 

 

>> spark = DatabricksSession.builder.getOrCreate()
DEBUG:databricks.connect:IPython module is present.
DEBUG:databricks.connect:Falling back to default configuration from the SDK.
INFO:databricks.sdk:loading DEFAULT profile from ~/.databrickscfg: host, token, cluster_id
DEBUG:databricks.sdk:Attempting to configure auth: pat

>> spark.sql("SELECT now()")
Traceback (most recent call last):
  File "C:\***\lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-9-4c2039c39977>", line 1, in <module>
    spark.sql("SELECT now()")
  File "C:\***\lib\site-packages\pyspark\sql\connect\session.py", line 572, in sql
    data, properties = self.client.execute_command(cmd.command(self._client))
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1139, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req, observations or {})
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1515, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req, observations):
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1493, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1805, in _handle_error
    raise error
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1486, in _execute_and_fetch_as_iterator
    yield from handle_response(b)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1406, in handle_response
    self._verify_response_integrity(b)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1937, in _verify_response_integrity
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: Received incorrect server side session identifier for request. Please create a new Spark Session to reconnect. (5601ab48-a7cf-40c6-b59c-460381c816a6 != 8282a8c4-13cd-4fda-906e-2b1d8bec2115)

 

 

Shouldn't getOrCreate() recognize that it has to create a new Session? Am I doing something wrong? How do I forcibly create a new Session? I cannot use spark.stop() since this leads to the same error.

I am using databricks-connect 14.3.1, python 3.10.12

2 REPLIES 2

MarkusFra
New Contributor III

Thank you for your reply, @Retired_mod . But there is no issue in the availability of databricks-connect. I had a bit time to look into it and found that this issue does not exist in databricks-connect with a custer with runtime 13.3. It occurs with databricks-connect 14.3 and a cluster with Runtime 14.3.

databricks-connect-13.3 and Runtime 13.3 Cluster:

 

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.profile("DEBUGGING_133").getOrCreate()
spark.sql("SELECT 1")
# output: DataFrame[1: int]

# >>> Databricks cluster shuts down (e.g. because of timeout because of long running script)

spark.sql("SELECT 1")
# Cluster starts again automatically
# output: DataFrame[1: int]

 

databricks-connect-14.3 and Runtime 14.3 Cluster:

 

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.profile("DEBUGGING_133").getOrCreate()
spark.sql("SELECT 1")
# output: DataFrame[1: int]

# >>> Databricks cluster shuts down (e.g. because of timeout because of long running script)

spark.sql("SELECT 1")
# Cluster starts again automatically
# output: 
Traceback (most recent call last):
  File "****\lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-7-e8eb9b165388>", line 1, in <module>
    spark.sql("SELECT 1")
  File "****\lib\site-packages\pyspark\sql\connect\session.py", line 572, in sql
    data, properties = self.client.execute_command(cmd.command(self._client))
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1139, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req, observations or {})
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1515, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req, observations):
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1493, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1805, in _handle_error
    raise error
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1486, in _execute_and_fetch_as_iterator
    yield from handle_response(b)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1406, in handle_response
    self._verify_response_integrity(b)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1937, in _verify_response_integrity
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: Received incorrect server side session identifier for request. Please create a new Spark Session to reconnect. (ab413162-708a-423f-84c7-b04969ed3bf4 != 3c8ea3e4-e20f-4a31-82a0-ff938f4017c6)

 

Is this maybe a bug? Where can I see known issues or report this?

Michael_Chein
New Contributor II

If anyone encounters this problem, the solution that worked for me was to restart the Jupyter kernel. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group