Below you can find the minimal code to reproduce the scenario. which used to cause the error.
Do remember that this suddenly started to work as expected, while it used to fail prior to me posting this topic.
In any case, a few words on what we are doing.
We need streaming query to be processed using the provided function in foreachBatch, where this function should be configurable (i.e. we need to pass an object with some configuration args to it).
In the below example we simulate this by using higher order function which takes an instance of SomeConfiguration.
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
class SomeConfiguration():
def __init__(self, name: str):
self.name = name
def process_batch(config: SomeConfiguration):
def say_hello_foreach_microbatch(micro_batch_df: DataFrame, micro_batch_id):
print(f"Hello {config.name}!")
print(
f"The batch {micro_batch_id} has {micro_batch_df.count()} items.")
return say_hello_foreach_microbatch
def main():
spark = SparkSession.builder.getOrCreate()
data_stream = (
spark.readStream.format("delta")
.option("readChangeFeed", "true")
.option("ignoreChanges", "true")
.table("SOME_DELTA_TABLE")
.filter(col("status") == "Staged")
.filter(col("_change_type") == "insert")
)
data_stream.writeStream \
.option(
"checkpointLocation",
f"SOME_CHECK_POINT_LOCATION",
) \
.foreachBatch(process_batch(SomeConfiguration("Johnny"))) \
.outputMode("append") \
.trigger(availableNow=True) \
.start()\
.awaitTermination()
if __name__ == '__main__':
main()
The above code used to fail on a line, which actually references the
an instance of SomeConfiguration object, i.e. print(f"Hello {config.name}!") inside say_hello_foreach_microbatch function.
Same code started to work fine all of a sudden, despite the fact that there were no obvious changes to a cluster and definitely no changes to our code.
I was just curious if anyone new anything.
In this case it went from bad to better, but I am bit concerned if cluster can change behaviour without our control nor any official release from good to bad.