cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Re-establish SparkSession using Databricks connect after cluster restart

MarkusFra
New Contributor II

Hello,

when developing locally using Databricks connect how do I re-establish the SparkSession when the Cluster restarted? getOrCreate() seems to get the old invalid SparkSession even after Cluster restart instead of creating a new one or am I missing something?

Before Cluster restart everything works fine:

 

 

>> spark = DatabricksSession.builder.getOrCreate()
DEBUG:databricks.connect:IPython module is present.
DEBUG:databricks.connect:Falling back to default configuration from the SDK.
INFO:databricks.sdk:loading DEFAULT profile from ~/.databrickscfg: host, token, cluster_id
DEBUG:databricks.sdk:Attempting to configure auth: pat
DEBUG:databricks.connect:Creating SparkSession from SDK config: <Config: host=https://adb-**************.**.azuredatabricks.net, token=***, auth_type=pat, cluster_id=****-******-********>
DEBUG:databricks.connect:Validating configuration by using the Databricks SDK
DEBUG:urllib3.connectionpool:Starting new HTTPS connection (1): adb-6130442328907134.14.azuredatabricks.net:443
DEBUG:urllib3.connectionpool:https://adb-*******************.**.azuredatabricks.net:443 "GET /api/2.0/clusters/get?cluster_id=****-******-******** HTTP/1.1" 200 None
DEBUG:databricks.sdk:GET /api/2.0/clusters/get?cluster_id=****-******-********
< 200 OK
< {
<<< REDACTED: long message with api response >>>
< }
DEBUG:databricks.connect:Session validated successfully.

>> spark.sql("SELECT now()")
Out[7]: DataFrame[now(): timestamp]

 

 

After restart of the cluster:

 

 

>> spark = DatabricksSession.builder.getOrCreate()
DEBUG:databricks.connect:IPython module is present.
DEBUG:databricks.connect:Falling back to default configuration from the SDK.
INFO:databricks.sdk:loading DEFAULT profile from ~/.databrickscfg: host, token, cluster_id
DEBUG:databricks.sdk:Attempting to configure auth: pat

>> spark.sql("SELECT now()")
Traceback (most recent call last):
  File "C:\***\lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-9-4c2039c39977>", line 1, in <module>
    spark.sql("SELECT now()")
  File "C:\***\lib\site-packages\pyspark\sql\connect\session.py", line 572, in sql
    data, properties = self.client.execute_command(cmd.command(self._client))
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1139, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req, observations or {})
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1515, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req, observations):
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1493, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1805, in _handle_error
    raise error
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1486, in _execute_and_fetch_as_iterator
    yield from handle_response(b)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1406, in handle_response
    self._verify_response_integrity(b)
  File "C:\***\lib\site-packages\pyspark\sql\connect\client\core.py", line 1937, in _verify_response_integrity
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: Received incorrect server side session identifier for request. Please create a new Spark Session to reconnect. (5601ab48-a7cf-40c6-b59c-460381c816a6 != 8282a8c4-13cd-4fda-906e-2b1d8bec2115)

 

 

Shouldn't getOrCreate() recognize that it has to create a new Session? Am I doing something wrong? How do I forcibly create a new Session? I cannot use spark.stop() since this leads to the same error.

I am using databricks-connect 14.3.1, python 3.10.12

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @MarkusFraTo handle this situation more elegantly, consider the following approach:-

try:
    from databricks.connect import DatabricksSession
    spark = DatabricksSession.builder.getOrCreate()
except ImportError:
    from pyspark.sql import SparkSession
    spark = SparkSession.builder.getOrCreate()

In this code snippet:

  • We first try to import DatabricksSession from databricks.connect. If successful, we create a Databricks Connect Spark session.
  • If the import fails (indicating that Databricks Connect is not available), we fall back to creating a regular Spark session using SparkSession.builder.getOrCreate().

MarkusFra
New Contributor II

Thank you for your reply, @Kaniz . But there is no issue in the availability of databricks-connect. I had a bit time to look into it and found that this issue does not exist in databricks-connect with a custer with runtime 13.3. It occurs with databricks-connect 14.3 and a cluster with Runtime 14.3.

databricks-connect-13.3 and Runtime 13.3 Cluster:

 

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.profile("DEBUGGING_133").getOrCreate()
spark.sql("SELECT 1")
# output: DataFrame[1: int]

# >>> Databricks cluster shuts down (e.g. because of timeout because of long running script)

spark.sql("SELECT 1")
# Cluster starts again automatically
# output: DataFrame[1: int]

 

databricks-connect-14.3 and Runtime 14.3 Cluster:

 

from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.profile("DEBUGGING_133").getOrCreate()
spark.sql("SELECT 1")
# output: DataFrame[1: int]

# >>> Databricks cluster shuts down (e.g. because of timeout because of long running script)

spark.sql("SELECT 1")
# Cluster starts again automatically
# output: 
Traceback (most recent call last):
  File "****\lib\site-packages\IPython\core\interactiveshell.py", line 3508, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-7-e8eb9b165388>", line 1, in <module>
    spark.sql("SELECT 1")
  File "****\lib\site-packages\pyspark\sql\connect\session.py", line 572, in sql
    data, properties = self.client.execute_command(cmd.command(self._client))
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1139, in execute_command
    data, _, _, _, properties = self._execute_and_fetch(req, observations or {})
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1515, in _execute_and_fetch
    for response in self._execute_and_fetch_as_iterator(req, observations):
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1493, in _execute_and_fetch_as_iterator
    self._handle_error(error)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1805, in _handle_error
    raise error
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1486, in _execute_and_fetch_as_iterator
    yield from handle_response(b)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1406, in handle_response
    self._verify_response_integrity(b)
  File "****\lib\site-packages\pyspark\sql\connect\client\core.py", line 1937, in _verify_response_integrity
    raise PySparkAssertionError(
pyspark.errors.exceptions.base.PySparkAssertionError: Received incorrect server side session identifier for request. Please create a new Spark Session to reconnect. (ab413162-708a-423f-84c7-b04969ed3bf4 != 3c8ea3e4-e20f-4a31-82a0-ff938f4017c6)

 

Is this maybe a bug? Where can I see known issues or report this?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.