12-16-2023 09:12 PM
Dataset<Row> readDf = spark .readStream() .format("kafka") .options(...) .load(); readDf.writeStream() .outputMode(OutputMode.Update()) .foreachBatch(...) .trigger(Trigger.ProcessingTime(1000L)) .start();
I can receive a bunch of events, one per User ID. For each user event I need to read a Delta table from blob store 1 and write it as a parquet file (some transformation involved as well) into blob store 2. Since, the data replication is independent for each user event I am using the Java multithreading via parallelStream().
userEvents.parallelStream().forEach(userEvent -> { Dataset<Row> readEventDf = spark .read() .format("delta") .load(readPath); //perform some transformation on the above dataframe readEventDf.write() .format("parquet") .mode(SaveMode.Overwrite) .save(writePath); });
However, when during testing I am observing a weird behaviour. I pass 2 user events, say User-1 and User-2, and I could see 2 parallel threads processing them via the Java parallelStream() method. The processing is completely successful for one thread for User-1 but failing for the other thread for User-2 at the read() step itself. I get the error as, Failed to find data source: delta. Please find packages at...
I checked the SparkSession, SparkContext and SqlContext objects active in both the threads are exactly the same. Also, the Spark Conf properties set as also exactly the same.
If I remove the parallelism by changing the method call from parallelStream() to normal sequential stream(), then everything works fine. Hence, it is evident that there is no issue with the spark/delta libraries used or connection to the blob store/file format.
Can someone help here to explain this behaviour and how to resolve it?
12-17-2023 08:15 AM - edited 12-17-2023 08:17 AM
The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread. Spark in SparkClassUtils uses Thread.currentThread().getContextClassLoader which might behave differently in another thread.
To solve it I created my own ForkJoinWorkerThreadFactory implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.
public class MyForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); worker.setName("my-fjpworker-" + worker.getPoolIndex()); worker.setContextClassLoader(Thread.currentThread().getContextClassLoader()); // Set the ClassLoader return worker; } }
Similar issue: JAXB class not found with ForkJoinPool Java
Code reference: https://stackoverflow.com/a/57551188/23113106
12-17-2023 07:06 AM
---UPDATE---
I tried printing the Thread names and observe that User-1 processing happens on the same Thread as that of the main program, but, User-2 is processed on a ForkJoin common pool Thread.
Could it be that the Spark conf (related to Delta format) set in the MAIN thread are somehow not getting passed on to the new ForkJoinPool thread?
MAIN thread name: stream execution thread for [id = 04970c13-5d49-441d-8bce-893481eed417, runId = 6cba7397-1113-47a6-bbff-479c8ea695fc] user-1 thread name: stream execution thread for [id = 04970c13-5d49-441d-8bce-893481eed417, runId = 6cba7397-1113-47a6-bbff-479c8ea695fc] user-2 thread name: ForkJoinPool.commonPool-worker-3
12-17-2023 08:15 AM - edited 12-17-2023 08:17 AM
The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread. Spark in SparkClassUtils uses Thread.currentThread().getContextClassLoader which might behave differently in another thread.
To solve it I created my own ForkJoinWorkerThreadFactory implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.
public class MyForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory { @Override public ForkJoinWorkerThread newThread(ForkJoinPool pool) { final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool); worker.setName("my-fjpworker-" + worker.getPoolIndex()); worker.setContextClassLoader(Thread.currentThread().getContextClassLoader()); // Set the ClassLoader return worker; } }
Similar issue: JAXB class not found with ForkJoinPool Java
Code reference: https://stackoverflow.com/a/57551188/23113106
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now