cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark read with format as "delta" isn't working with Java multithreading

kartik-chandra
New Contributor III
0
I have a Spark application (using Java library) which needs to replicate data from one blob storage to another. I have created a readStream() within it which is listening continuously to a Kafka topic for incoming events. The corresponding writeStream is configured to process the same using the foreachBatch() option.
Dataset<Row> readDf = spark
        .readStream()
        .format("kafka")
        .options(...)
        .load();

readDf.writeStream()
      .outputMode(OutputMode.Update())
      .foreachBatch(...)
      .trigger(Trigger.ProcessingTime(1000L))
      .start();

I can receive a bunch of events, one per User ID. For each user event I need to read a Delta table from blob store 1 and write it as a parquet file (some transformation involved as well) into blob store 2. Since, the data replication is independent for each user event I am using the Java multithreading via parallelStream().

userEvents.parallelStream().forEach(userEvent -> {
      Dataset<Row> readEventDf = spark
           .read()
           .format("delta")
           .load(readPath);

//perform some transformation on the above dataframe

      readEventDf.write()
        .format("parquet")
        .mode(SaveMode.Overwrite)
        .save(writePath);
});

However, when during testing I am observing a weird behaviour. I pass 2 user events, say User-1 and User-2, and I could see 2 parallel threads processing them via the Java parallelStream() method. The processing is completely successful for one thread for User-1 but failing for the other thread for User-2 at the read() step itself. I get the error as, Failed to find data source: delta. Please find packages at...

I checked the SparkSession, SparkContext and SqlContext objects active in both the threads are exactly the same. Also, the Spark Conf properties set as also exactly the same.

If I remove the parallelism by changing the method call from parallelStream() to normal sequential stream(), then everything works fine. Hence, it is evident that there is no issue with the spark/delta libraries used or connection to the blob store/file format.

Can someone help here to explain this behaviour and how to resolve it?

1 ACCEPTED SOLUTION

Accepted Solutions

kartik-chandra
New Contributor III

The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread. Spark in SparkClassUtils uses Thread.currentThread().getContextClassLoader which might behave differently in another thread.

To solve it I created my own ForkJoinWorkerThreadFactory implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.

public class MyForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

  @Override
  public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    worker.setName("my-fjpworker-" + worker.getPoolIndex());
    worker.setContextClassLoader(Thread.currentThread().getContextClassLoader());  // Set the ClassLoader
    return worker;
  }
}

Similar issue: JAXB class not found with ForkJoinPool Java

Code reference: https://stackoverflow.com/a/57551188/23113106



 

View solution in original post

2 REPLIES 2

kartik-chandra
New Contributor III

---UPDATE---

I tried printing the Thread names and observe that User-1 processing happens on the same Thread as that of the main program, but, User-2 is processed on a ForkJoin common pool Thread.

Could it be that the Spark conf (related to Delta format) set in the MAIN thread are somehow not getting passed on to the new ForkJoinPool thread?

MAIN thread name: stream execution thread for [id = 04970c13-5d49-441d-8bce-893481eed417, runId = 6cba7397-1113-47a6-bbff-479c8ea695fc]

user-1 thread name: stream execution thread for [id = 04970c13-5d49-441d-8bce-893481eed417, runId = 6cba7397-1113-47a6-bbff-479c8ea695fc]

user-2 thread name: ForkJoinPool.commonPool-worker-3

kartik-chandra
New Contributor III

The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread. Spark in SparkClassUtils uses Thread.currentThread().getContextClassLoader which might behave differently in another thread.

To solve it I created my own ForkJoinWorkerThreadFactory implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.

public class MyForkJoinWorkerThreadFactory implements ForkJoinWorkerThreadFactory {

  @Override
  public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
    final ForkJoinWorkerThread worker = ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    worker.setName("my-fjpworker-" + worker.getPoolIndex());
    worker.setContextClassLoader(Thread.currentThread().getContextClassLoader());  // Set the ClassLoader
    return worker;
  }
}

Similar issue: JAXB class not found with ForkJoinPool Java

Code reference: https://stackoverflow.com/a/57551188/23113106



 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.