cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Read Structured Streaming state information

jcozar
Contributor

Hi!

I am exploring the read state functionality in spark streaming: https://docs.databricks.com/en/structured-streaming/read-state.html

When I start a streaming query like this:

 

 

(
    ...
    .writeStream
    .option("checkpointLocation", f"{CHECKPOINTS_PATH}/experiment_2_2/")
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_2_2")
    .start()
)

 

 

I see it works correctly (I can query the table `display_experiment_2_2`, and see the query 

recentProgress information.

However, when I run this code to test the state store:

 

 

(
    spark
    .read
    .format("statestore")
    .load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)

 

 

it raises the following error:

 

 

2024-04-04 12:41:01,082 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-04T12:41:01.081634178+00:00"}"

 

 

The same with state-metadata. When I run this code

 

 

(
    spark
    .read
    .format("state-metadata")
    .load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)

 

 

it raises the following error:

 

 

2024-04-04 12:41:03,988 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L]  state-metadata-table"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {created_time:"2024-04-04T12:41:03.988523804+00:00", grpc_status:13, grpc_message:"Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L]  state-metadata-table"}"

 

 

I am using a cluster with Shared Compute policy, Shared Access mode and Databricks Runtime version 14.3.

Is it a bug or am I doing something wrong?

Thank you very much!

1 REPLY 1

Hi @Retired_mod , thank you for your time!

I tried again. I used format("memory") and format("delta"), just in case, but the results are the same.

Error traceback using `dbfs:/tmp/checkpoints/experiment_2_2` for checkpoint using format("statestore"):

2024-04-07 09:11:50,672 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:11:50.671796067+00:00"}"
>

Error traceback using a azure storage account path for checkpoint using format("statestore"):

2024-04-07 09:06:55,381 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:06:55.381219715+00:00"}"
>

 If you need more details about the code, stream, or storage accounts just tell me.

Thank you very much!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group