Streaming query error - [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA]

lprevost
Contributor III

[STREAM_FAILED] Query [id = 6a821fbc-490b-4ad8-891d-e4cacc2af1d6, runId = e055fede-8012-4369-861b-47183999e91d] terminated with exception: [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST

I'm getting a streaming query error on a previously successful query on my latest run.   I think this may have been introduced because I added some "isEmpty" logic to make the query fail gracefully if there is no incremental data to merge.   I say this because of this part of the error:  

Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: [].

My forEachBatch code:

def host_transform(df😞
    return (df
         .selectExpr("rurl as host_rurl")
         .dropDuplicates(["host_rurl"])
         .withColumn("domain_rurl", extract.reverse(extract.sled_domain_extractor("host_rurl", reverse_url = True)))
         .withColumn("tags", F.array().cast(T.ArrayType(T.StringType(), True)))
    )
def upsertToDelta(batchdf, batchId😞
  # Builds both host and domain unique rurls master tables with an autogenerated id long int id as PK
  if not batchdf.isEmpty():
    batchdf = host_transform(batchdf)
    insert_values = {k:f"s.{k}" for k in batchdf.columns}
    print(f"Upsert being performed with batch {batchId} on {targetTableName} with target columns as {batchdf.columns}.")
    # targetTable and targetTableName must be set before each upsert
    (targetTable.alias("t").merge(
        batchdf.alias("s"),
        "s.host_rurl = t.host_rurl")
      .whenNotMatchedInsert(values= insert_values)  #all but id which is autogenerated
      .execute()
    )
  else:
    print(f"New batch {batchId} is empty.")