โ04-04-2024 05:54 AM
Hi!
I am exploring the read state functionality in spark streaming: https://docs.databricks.com/en/structured-streaming/read-state.html
When I start a streaming query like this:
(
...
.writeStream
.option("checkpointLocation", f"{CHECKPOINTS_PATH}/experiment_2_2/")
.outputMode("append")
.format("memory")
.queryName("display_experiment_2_2")
.start()
)
I see it works correctly (I can query the table `display_experiment_2_2`, and see the query
However, when I run this code to test the state store:
(
spark
.read
.format("statestore")
.load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)
it raises the following error:
2024-04-04 12:41:01,082 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-04T12:41:01.081634178+00:00"}"
The same with state-metadata. When I run this code
(
spark
.read
.format("state-metadata")
.load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)
it raises the following error:
2024-04-04 12:41:03,988 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L] state-metadata-table"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {created_time:"2024-04-04T12:41:03.988523804+00:00", grpc_status:13, grpc_message:"Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L] state-metadata-table"}"
I am using a cluster with Shared Compute policy, Shared Access mode and Databricks Runtime version 14.3.
Is it a bug or am I doing something wrong?
Thank you very much!
โ04-05-2024 12:43 PM
Hi @jcozar,
dbfs:/tmp/checkpoints/experiment_2_2
) is correct and accessible.โ04-07-2024 02:12 AM
Hi @Kaniz_Fatma , thank you for your time!
I tried again. I used format("memory") and format("delta"), just in case, but the results are the same.
Error traceback using `dbfs:/tmp/checkpoints/experiment_2_2` for checkpoint using format("statestore"):
2024-04-07 09:11:50,672 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:11:50.671796067+00:00"}"
>
Error traceback using a azure storage account path for checkpoint using format("statestore"):
2024-04-07 09:06:55,381 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
return _end_unary_response_blocking(state, call, False, None)
File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
status = StatusCode.INTERNAL
details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:06:55.381219715+00:00"}"
>
If you need more details about the code, stream, or storage accounts just tell me.
Thank you very much!
Excited to expand your horizons with us? Click here to Register and begin your journey to success!
Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!