Hi!
I am exploring the read state functionality in spark streaming: https://docs.databricks.com/en/structured-streaming/read-state.html
When I start a streaming query like this:
(
...
.writeStream
.option("checkpointLocation", f"{CHECKPOINTS_PATH}/experiment_2_2/")
.outputMode("append")
.format("memory")
.queryName("display_experiment_2_2")
.start()
)
I see it works correctly (I can query the table `display_experiment_2_2`, and see the query
recentProgress information.
However, when I run this code to test the state store:
(
spark
.read
.format("statestore")
.load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)
it raises the following error:
2024-04-04 12:41:01,082 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-04T12:41:01.081634178+00:00"}"
The same with state-metadata. When I run this code
(
spark
.read
.format("state-metadata")
.load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)
it raises the following error:
2024-04-04 12:41:03,988 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L] state-metadata-table"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {created_time:"2024-04-04T12:41:03.988523804+00:00", grpc_status:13, grpc_message:"Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L] state-metadata-table"}"
I am using a cluster with Shared Compute policy, Shared Access mode and Databricks Runtime version 14.3.
Is it a bug or am I doing something wrong?
Thank you very much!