[STREAM_FAILED] Query [id = 6a821fbc-490b-4ad8-891d-e4cacc2af1d6, runId = e055fede-8012-4369-861b-47183999e91d] terminated with exception: [STREAMING_STATEFUL_OPERATOR_NOT_MATCH_IN_STATE_METADATA] Streaming stateful operator name does not match with the operator in state metadata. This likely to happen when user adds/removes/changes stateful operator of existing streaming query. Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: []. SQLSTATE: 42K03 SQLSTATE: XXKST
I'm getting a streaming query error on a previously successful query on my latest run. I think this may have been introduced because I added some "isEmpty" logic to make the query fail gracefully if there is no incremental data to merge. I say this because of this part of the error:
Stateful operators in the metadata: [(OperatorId: 0 -> OperatorName: dedupe)]; Stateful operators in current batch: [].
My forEachBatch code:
def host_transform(df😞
return (df
.selectExpr("rurl as host_rurl")
.dropDuplicates(["host_rurl"])
.withColumn("domain_rurl", extract.reverse(extract.sled_domain_extractor("host_rurl", reverse_url = True)))
.withColumn("tags", F.array().cast(T.ArrayType(T.StringType(), True)))
)
def upsertToDelta(batchdf, batchId😞
# Builds both host and domain unique rurls master tables with an autogenerated id long int id as PK
if not batchdf.isEmpty():
batchdf = host_transform(batchdf)
insert_values = {k:f"s.{k}" for k in batchdf.columns}
print(f"Upsert being performed with batch {batchId} on {targetTableName} with target columns as {batchdf.columns}.")
# targetTable and targetTableName must be set before each upsert
(targetTable.alias("t").merge(
batchdf.alias("s"),
"s.host_rurl = t.host_rurl")
.whenNotMatchedInsert(values= insert_values) #all but id which is autogenerated
.execute()
)
else:
print(f"New batch {batchId} is empty.")