cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Replay a stream after converting to liquid cluster failes

EndreM
New Contributor III

I have problem replaying a stream.
I need to replay it because conversion from liquid cluster
to partition doesnt work. I see a lot of garbage collection
and memory maxes out immediatly. Then the driver restarts.

TO debug the problem I try to force only 1 record to be read so I run:
reader = reader.load(bronze_path).select("*", "_metadata").limit(1)
print("going to collect reader. Only 1 record")
reader.collect()

This gives error:
File /databricks/spark/python/pyspark/sql/connect/client/core.py:2155, in SparkConnectClient._handle_rpc_error(self, rpc_error)
2140 raise Exception(
2141 "Python versions in the Spark Connect client and server are different. "
2142 "To execute user-defined functions, client and server should have the "
(...)
2151 "https://docs.databricks.com/en/release-notes/serverless.html" target="_blank" rel="noopener noreferrer">https://docs.databricks.com/en/release-notes/serverless.html</a>.</span><span>"
2152 )
2153 # END-EDGE
-> 2155 raise convert_exception(
2156 info,
2157 status.message,
2158 self._fetch_enriched_error(info),
2159 self._display_server_stack_trace(),
2160 ) from None
2162 raise SparkConnectGrpcException(status.message) from None
2163 else:

And this is the explanation chatgpt gives which i find strange since its running on databricks:
This error usually means there's a mismatch between the Python versions (or environments) used by your Spark Connect client and the Spark server. When you call collect(), Spark Connect tries to run user‐defined code, and if the Python versions differ the operation fails.

To fix this you should ensure that:

• The Python version in your client environment (where you're running the code) exactly matches the Python version configured on the Spark server/cluster.

• All relevant libraries are consistent between client and server.

Once they match, the collect() call should work without the RPC error.

 

8 REPLIES 8

Brahmareddy
Honored Contributor III

Hi EndreM,

How are you doing today? As per my understanding, From what you’ve described, it looks like the error might be caused by a mismatch in the Python versions between your Databricks Connect client (if you're using something like PyCharm or VS Code) and the Databricks runtime on the cluster. Even though it seems strange since you're running on Databricks, using .collect() or similar actions through Spark Connect can sometimes fail if the environments don’t match exactly. A simple workaround is to try running the same code directly in a Databricks notebook instead of through your IDE—this helps ensure everything is aligned. Also, instead of .limit(1).collect(), try using .sample(False, 0.0001).take(1) or .head(1) which can be lighter and avoid scanning the whole dataset. Let me know if you'd like help reviewing your streaming setup or tuning cluster memory—happy to help you sort it out smoothly!

Regards,

Brahma

EndreM
New Contributor III

Thanks, ill try the method. I did run from a Databricks notebook. Our stream is defined with 3 notebooks as well as a library. I have previously got an error when the library was build with an old python version and I upgraded databricks to 15.4. So I havnt found this error and I find it strange when running the code in a databricks notebook.

EndreM
New Contributor III

It looks like the table is a streaming source and head(1) is not allowed. limit(1) is allowed but your not allowed to inspect the dataframe until a batch streaming process is started.

EndreM
New Contributor III

I get another error which is perhaps related to this error:

com.databricks.unity.error.MissingCredentialScopeException: [UNITY_CREDENTIAL_SCOPE_MISSING_SCOPE] Missing Credential Scope. Failed to find Unity Credential Scope.. SQLSTATE: XXKUC
at com.databricks.unity.error.MissingCredentialScopeException$.withDebugLog(UCSExceptions.scala:62)
at com.databricks.unity.UCSExecutor.$anonfun$currentScope$1(UCSExecutor.scala:82)
at scala.Option.getOrElse(Option.scala:189)

 

Is unity catalog not setup correctly?

Brahmareddy
Honored Contributor III

Hey EndreM, Thanks a lot for the extra info!

Since you're running this from a Databricks notebook and still getting the Python version mismatch earlier, and now seeing a MissingCredentialScopeException, it strongly points to a misconfiguration related to Unity Catalog permissions or the identity used to access the data. The error suggests that Unity Catalog is expecting a credential scope (essentially, permission context) that’s not available in your current session—this usually happens when your cluster or user identity doesn't have access to the underlying external location or the credential isn't properly scoped to your workspace or catalog.

To fix this, here are a few suggestions:

  • Check External Location Permissions: Make sure the external location (like your S3 path) is registered and you have the correct privileges set using GRANT in Unity Catalog.

  • Verify Credential Scope Setup: Go to the Unity Catalog configuration in your workspace and confirm that the external location is linked with a valid credential and a storage credential scope. You can do this via the UI or with CREATE/ALTER EXTERNAL LOCATION and CREATE/GRANT STORAGE CREDENTIAL.

  • Cluster Access Mode: If you’re using a shared or assigned cluster, make sure it’s in Shared Access Mode—this is required to work with Unity Catalog, especially with fine-grained permissions.

  • Library Compatibility: Double-check that your custom library was rebuilt with Python 3.10 (the default in DBR 15.4), and no old .pyc or mismatched wheel files are lingering.

Once Unity Catalog permissions and scopes are configured properly, and your cluster is using the correct access mode, this kind of credential error should go away. Let me know if you want help checking the config or writing any of the SQL commands—I’m happy to help you get this sorted!

Regards,

Brahma

EndreM
New Contributor III

I dont know the answer to the first 2 questions, but its a shared compute, and the library is build with python 3.12 (compatible with 3.11 and 3.10). The compute is on databricks 15.4.

The problem is related to this question: https://community.databricks.com/t5/data-engineering/replay-stream-to-migrate-to-liquid-cluster/m-p/...

By limiting the number of records we transform from bronze to silver I was able to stream 100 records to silver with liquid clustering. So perhaps access control is not an issue. However the full transformation which for one record in bronze perhaps produce 200 000 records in silver - the job either times out (with G1GC garbage collector) or the driver restarts with the parallel garbage collector. This job ran fine when silver table was partitioned and unity catalog was not enabled.

Please reach out!

Kind regards, Endre

EndreM
New Contributor III

After granting more permissions for users to the table I also see this in the log:

chown: invalid user: ‘null:spark-users’
chown: invalid user: ‘null:spark-users’
do_all_setup_for_username: iptables command took 0.0015606880187988281 seconds
do_all_setup_for_username: set_process_identity took 0.0005297660827636719 seconds
Traceback (most recent call last):
File "<frozen runpy>", line 198, in _run_module_as_main
File "<frozen runpy>", line 88, in _run_code
File "/databricks/spark/python/pyspark/sql/connect/streaming/worker/foreach_batch_worker.py", line 154, in <module>
(sock_file, sock) = local_connect_and_auth(java_port, auth_secret)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/databricks/spark/python/pyspark/util.py", line 726, in local_connect_and_auth
raise PySparkRuntimeError(
pyspark.errors.exceptions.base.PySparkRuntimeError: [CANNOT_OPEN_SOCKET] Can not open socket: ["tried to connect to ('127.0.0.1', 45771), but an error occurred: [Errno 111] Connection refused"].
do_all_setup_for_username: iptables command took 0.0020797252655029297 seconds
do_all_setup_for_username: set_process_identity took 0.0008292198181152344 seconds
Streaming foreachBatch worker is starting with url unix:///databricks/sparkconnect/grpc.sock and root sessionId 8a167574-1d44-4777-9309-c455c7fceab2.
finished setting up the root session 8a167574-1d44-4777-9309-c455c7fceab2.

EndreM
New Contributor III

After increasing the compute to one with 500 GB memory, the job was able to transfer ca 300 GB of data, but it produced a large amount of files, 26000. While the old table with partition and no liquid cluster had 4000 files with a total of 1.2 TB of data. Why does liquid cluster or unity catalog result in so many files being produced?

It looks like the job ran for more than 2 days, but there is no record of the job run in the logs, and subsequently running the job times out after 3 hours. The data is only 1.2 TB and even with 500 GB of memory it looks like it is not enough... How to resolve this issue? 

Its a lot less flexibility and tooling available in debugging any PySpark issues in databricks than debugging a Kafka pipeline. When using Kafka you have the option to do low level coding in Apache Kafka, while also getting a lot out of the box functionality from higher level library Apache Steams. Peeking under the hood of what is going on isnt possible in Databricks. Quite frustrating.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now