cancel
Showing results for 
Search instead for 
Did you mean: 
Warehousing & Analytics
Engage in discussions on data warehousing, analytics, and BI solutions within the Databricks Community. Share insights, tips, and best practices for leveraging data for informed decision-making.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read Structured Streaming state information

jcozar
Contributor

Hi!

I am exploring the read state functionality in spark streaming: https://docs.databricks.com/en/structured-streaming/read-state.html

When I start a streaming query like this:

 

 

(
    ...
    .writeStream
    .option("checkpointLocation", f"{CHECKPOINTS_PATH}/experiment_2_2/")
    .outputMode("append")
    .format("memory")
    .queryName("display_experiment_2_2")
    .start()
)

 

 

I see it works correctly (I can query the table `display_experiment_2_2`, and see the query 

recentProgress information.

However, when I run this code to test the state store:

 

 

(
    spark
    .read
    .format("statestore")
    .load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)

 

 

it raises the following error:

 

 

2024-04-04 12:41:01,082 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/checkpoints/experiment_2_2, batchId=1, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-04T12:41:01.081634178+00:00"}"

 

 

The same with state-metadata. When I run this code

 

 

(
    spark
    .read
    .format("state-metadata")
    .load(f"{CHECKPOINTS_PATH}/experiment_2_2/")
)

 

 

it raises the following error:

 

 

2024-04-04 12:41:03,988 1522 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L]  state-metadata-table"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {created_time:"2024-04-04T12:41:03.988523804+00:00", grpc_status:13, grpc_message:"Data source V2 relation is not supported on table acl or credential passthrough clusters. RelationV2[operatorId#27178L, operatorName#27179, stateStoreName#27180, numPartitions#27181, minBatchId#27182L, maxBatchId#27183L]  state-metadata-table"}"

 

 

I am using a cluster with Shared Compute policy, Shared Access mode and Databricks Runtime version 14.3.

Is it a bug or am I doing something wrong?

Thank you very much!

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @jcozar

  • Execute the streaming query again to construct the state schema.
  • Ensure that the checkpoint location (dbfs:/tmp/checkpoints/experiment_2_2) is correct and accessible.

Hi @Kaniz_Fatma , thank you for your time!

I tried again. I used format("memory") and format("delta"), just in case, but the results are the same.

Error traceback using `dbfs:/tmp/checkpoints/experiment_2_2` for checkpoint using format("statestore"):

2024-04-07 09:11:50,672 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=dbfs:/tmp/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:11:50.671796067+00:00"}"
>

Error traceback using a azure storage account path for checkpoint using format("statestore"):

2024-04-07 09:06:55,381 1956 ERROR _handle_rpc_error GRPC Error received
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/sql/connect/client/core.py", line 1339, in _analyze
    resp = self._stub.AnalyzePlan(req, metadata=self._builder.metadata())
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 946, in __call__
    return _end_unary_response_blocking(state, call, False, None)
  File "/databricks/python/lib/python3.10/site-packages/grpc/_channel.py", line 849, in _end_unary_response_blocking
    raise _InactiveRpcError(state)
grpc._channel._InactiveRpcError: <_InactiveRpcError of RPC that terminated with:
	status = StatusCode.INTERNAL
	details = "[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).
Rerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03"
	debug_error_string = "UNKNOWN:Error received from peer unix:/databricks/sparkconnect/grpc.sock {grpc_message:"[STDS_FAILED_TO_READ_STATE_SCHEMA] Failed to read the state schema. Either the file does not exist, or the file is corrupted. options: StateSourceOptions(checkpointLocation=abfss://unitycatalog@mastercatalogdev.dfs.core.windows.net/tutorials/checkpoints/experiment_2_2, batchId=9, operatorId=0, storeName=default, joinSide=none).\nRerun the streaming query to construct the state schema, and report to the corresponding communities or vendors if the error persists. SQLSTATE: 42K03", grpc_status:13, created_time:"2024-04-07T09:06:55.381219715+00:00"}"
>

 If you need more details about the code, stream, or storage accounts just tell me.

Thank you very much!

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group