cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming query error - [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA]

lprevost
Contributor II

[STREAM_FAILED] Query [id = 6a821fbc-490b-4ad8-891d-e4cacc2af1d6, runId = e055fede-8012-4369-861b-47183999e91d] terminated with exception: [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

I'm getting a streaming query error on a previously successful query on my latest run.   I think this may have been introduced because I added some "isEmpty" logic to make the query fail gracefully if there is no incremental data to merge.   I say this because of this part of the error:  

Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: [].

My forEachBatch code:

def host_transform(df😞
    return (df
         .selectExpr("rurl as host_rurl")
         .dropDuplicates(["host_rurl"])
         .withColumn("domain_rurl", extract.reverse(extract.sled_domain_extractor("host_rurl", reverse_url = True)))
         .withColumn("tags", F.array().cast(T.ArrayType(T.StringType(), True)))
    )
def upsertToDelta(batchdf, batchId😞
  # Builds both host and domain unique rurls master tables with an autogenerated id long int id as PK
  if not batchdf.isEmpty():
    batchdf = host_transform(batchdf)
    insert_values = {k:f"s.{k}" for k in batchdf.columns}
    print(f"Upsert being performed with batch {batchId} on {targetTableName} with target columns as {batchdf.columns}.")
    # targetTable and targetTableName must be set before each upsert
    (targetTable.alias("t").merge(
        batchdf.alias("s"),
        "s.host_rurl = t.host_rurl")
      .whenNotMatchedInsert(values= insert_values)  #all but id which is autogenerated
      .execute()
    )
  else:
    print(f"New batch {batchId} is empty.")

 

1 REPLY 1

Saritha_S
Databricks Employee
Databricks Employee

Hi @lprevost 

Good day!!

Please find below my analysis for your issue. 

Error:

[STREAM_FAILED] Query [id = 6a821fbc-490b-4ad8-891d-e4cacc2af1d6, runId = e055fede-8012-4369-861b-47183999e91d] terminated with exception: [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

Root cause:
The STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA error occurred as expected due to removal of stateful operators like drop_duplicates.

Solution and recommendation
The only recommended and reliable solution is to restart the streaming query with a new checkpoint. It provides a clean slate for the query, preventing unforeseen complications caused by mismatched or corrupted states.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now