05-24-2024 03:15 AM
Recently we have run into an issue using foreachBatch after upgrading our Databricks cluster on Azure to a runtime version 14 with Spark 3.5 with Shared access mode and Unity catalogue.
The issue was manifested by ModuleNotFoundError error being thrown whenever we call a function from foreachBatch, which uses an object, which is not declared within the scope of a given function, but it is declared in another module.
SparkConnectGrpcException: (org.apache.spark.api.python.StreamingPythonRunner$StreamingPythonRunnerInitializationException) 
[STREAMING_PYTHON_RUNNER_INITIALIZATION_FAILURE] Streaming Runner initialization failed, returned -2. 
Cause: Traceback (most recent call last): File "/databricks/spark/python/pyspark/serializers.py", line 193, 
in _read_with_length return self.loads(obj) File "/databricks/spark/python/pyspark/serializers.py", line 571, 
in loads return cloudpickle.loads(obj, encoding=encoding) ModuleNotFoundError: No module named 'foreach_batch_test'So, after banging my head against the wall for some time, I finally acknowledged that this could be a bug in Databricks.
While compiling the report, everything started to work again today??
Can anyone provide some details about what happened?
Cheers, thanks
05-27-2024 12:53 AM
@mjar 
Which DBR are you using? I mean, exactly.
To use foreachBatch in shared clusters you need at least 14.2
05-27-2024 01:09 AM
Hi @daniel_sahal, thanks for getting back.
We are using 14.3, Spark 3.5.0, Scala 2.12
05-27-2024 04:42 AM
@mjar 
Okay, DBR version should not be an issue then.
Could you share a code snippet here?
05-27-2024 06:17 AM
 
Below you can find the minimal code to reproduce the scenario. which used to cause the error.
Do remember that this suddenly started to work as expected, while it used to fail prior to me posting this topic.
In any case, a few words on what we are doing.
We need streaming query to be processed using the provided function in foreachBatch, where this function should be configurable (i.e. we need to pass an object with some configuration args to it).
In the below example we simulate this by using higher order function which takes an instance of SomeConfiguration. 
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col
class SomeConfiguration():    
    def __init__(self, name: str):
        self.name = name
def process_batch(config: SomeConfiguration):
    def say_hello_foreach_microbatch(micro_batch_df: DataFrame, micro_batch_id):
        print(f"Hello {config.name}!")
        print(
            f"The batch {micro_batch_id} has {micro_batch_df.count()} items.")      
    return say_hello_foreach_microbatch
def main():
    spark = SparkSession.builder.getOrCreate()
    data_stream = (
        spark.readStream.format("delta")
        .option("readChangeFeed", "true")
        .option("ignoreChanges", "true")
        .table("SOME_DELTA_TABLE")
        .filter(col("status") == "Staged")
        .filter(col("_change_type") == "insert")
    )
    data_stream.writeStream \
        .option(
            "checkpointLocation",
            f"SOME_CHECK_POINT_LOCATION",
        ) \
        .foreachBatch(process_batch(SomeConfiguration("Johnny"))) \
        .outputMode("append") \
        .trigger(availableNow=True) \
        .start()\
        .awaitTermination()
if __name__ == '__main__':
    main()
The above code used to fail on a line, which actually references the 
an instance of SomeConfiguration object, i.e. print(f"Hello {config.name}!") inside say_hello_foreach_microbatch function.
Same code started to work fine all of a sudden, despite the fact that there were no obvious changes to a cluster and definitely no changes to our code.
I was just curious if anyone new anything.
In this case it went from bad to better, but I am bit concerned if cluster can change behaviour without our control nor any official release from good to bad.
06-24-2024 05:03 AM
@mjar I have exactly the same issue... found any solution meanwhile?
06-25-2024 01:35 AM
Hi, @Nastia unfortunately I don't have any answers yet.
I have one channel opened with Databricks though, but no news yet.
On plus side (well for us) the workflows still work as expected since the magic fix occurred in our environments.
09-02-2024 12:35 PM
I am facing this issue with Scala Spark streaming in shared cluster with 15.4 LTS run time. Is there any fix or alternative for this. I can't used assigned cluster as my table has masked columns and my company hasn't enabled serverless yet in our workspaces
03-04-2025 04:09 PM
Hi,
Any news regarding that issue? I have the same one on job cluster with 15.4 LTS when using asset bundles with foreachBatch in .py file and call it from notebook. When the same code is located in notebook - it works file.
(prep_silver_df(bronze_table_fqn_df)
.writeStream
.trigger(availableNow=True)
.foreachBatch(lambda df, batchId: upsertToDelta(df, batchId, silver_table_fqn))
.option("checkpointLocation", silver_checkpoint_path)
.outputMode("update")
.start()
.awaitTermination()
)
03-09-2025 11:54 PM
No news here, although everything works fine on our clusters.
05-13-2025 11:43 PM
I am having the same issue using serverless compute. I think the issue comes from this documentation limitations
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now