<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Spark read with format as &amp;quot;delta&amp;quot; isn't working with Java multithreading in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55397#M6276</link>
    <description>&lt;P&gt;The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread.&amp;nbsp;&lt;SPAN&gt;Spark in&amp;nbsp;&lt;/SPAN&gt;SparkClassUtils&lt;SPAN&gt;&amp;nbsp;uses&amp;nbsp;&lt;/SPAN&gt;Thread.currentThread().getContextClassLoader&lt;SPAN&gt;&amp;nbsp;which might behave differently in another thread.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;To solve it I created my own&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;ForkJoinWorkerThreadFactory&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;public&lt;/SPAN&gt; &lt;SPAN class=""&gt;class&lt;/SPAN&gt; &lt;SPAN class=""&gt;MyForkJoinWorkerThreadFactory&lt;/SPAN&gt; &lt;SPAN class=""&gt;implements&lt;/SPAN&gt; &lt;SPAN class=""&gt;ForkJoinWorkerThreadFactory&lt;/SPAN&gt; {

  &lt;SPAN class=""&gt;@Override&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;public&lt;/SPAN&gt; ForkJoinWorkerThread &lt;SPAN class=""&gt;newThread&lt;/SPAN&gt;&lt;SPAN class=""&gt;(ForkJoinPool pool)&lt;/SPAN&gt; {
    &lt;SPAN class=""&gt;final&lt;/SPAN&gt; &lt;SPAN class=""&gt;ForkJoinWorkerThread&lt;/SPAN&gt; &lt;SPAN class=""&gt;worker&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    worker.setName(&lt;SPAN class=""&gt;"my-fjpworker-"&lt;/SPAN&gt; + worker.getPoolIndex());
    worker.setContextClassLoader(Thread.currentThread().getContextClassLoader());  &lt;SPAN class=""&gt;// Set the ClassLoader&lt;/SPAN&gt;
    &lt;SPAN class=""&gt;return&lt;/SPAN&gt; worker;
  }
}&lt;/PRE&gt;&lt;P&gt;Similar issue:&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://stackoverflow.com/questions/66393004/jaxb-class-not-found-with-forkjoinpool-java" target="_blank" rel="noopener"&gt;JAXB class not found with ForkJoinPool Java&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Code reference:&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://stackoverflow.com/a/57551188/23113106" target="_blank" rel="noopener"&gt;https://stackoverflow.com/a/57551188/23113106&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Sun, 17 Dec 2023 16:17:23 GMT</pubDate>
    <dc:creator>kartik-chandra</dc:creator>
    <dc:date>2023-12-17T16:17:23Z</dc:date>
    <item>
      <title>Spark read with format as "delta" isn't working with Java multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55362#M6274</link>
      <description>&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;0&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;SPAN&gt;I have a Spark application (using Java library) which needs to replicate data from one blob storage to another. I have created a readStream() within it which is listening continuously to a Kafka topic for incoming events. The corresponding writeStream is configured to process the same using the foreachBatch() option.&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;&lt;DIV class=""&gt;&lt;DIV class=""&gt;&lt;PRE&gt;Dataset&amp;lt;Row&amp;gt; readDf = spark
        .readStream()
        .format(&lt;SPAN class=""&gt;"kafka"&lt;/SPAN&gt;)
        .options(...)
        .load();

readDf.writeStream()
      .outputMode(OutputMode.Update())
      .foreachBatch(...)
      .trigger(Trigger.ProcessingTime(&lt;SPAN class=""&gt;1000L&lt;/SPAN&gt;))
      .start();&lt;/PRE&gt;&lt;P&gt;I can receive a bunch of events, one per User ID. For each user event I need to read a Delta table from blob store 1 and write it as a parquet file (some transformation involved as well) into blob store 2. Since, the data replication is independent for each user event I am using the Java multithreading via parallelStream().&lt;/P&gt;&lt;PRE&gt;userEvents.parallelStream().forEach(userEvent -&amp;gt; {
      Dataset&amp;lt;Row&amp;gt; readEventDf = spark
           .read()
           .format(&lt;SPAN class=""&gt;"delta"&lt;/SPAN&gt;)
           .load(readPath);

&lt;SPAN class=""&gt;//perform some transformation on the above dataframe&lt;/SPAN&gt;

      readEventDf.write()
        .format(&lt;SPAN class=""&gt;"parquet"&lt;/SPAN&gt;)
        .mode(SaveMode.Overwrite)
        .save(writePath);
});&lt;/PRE&gt;&lt;P&gt;However, when during testing I am observing a weird behaviour. I pass 2 user events, say User-1 and User-2, and I could see 2 parallel threads processing them via the Java parallelStream() method. The processing is completely successful for one thread for User-1 but failing for the other thread for User-2 at the&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;read()&lt;/STRONG&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;step itself. I get the error as,&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;STRONG&gt;Failed to find data source: delta. Please find packages at...&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;I checked the SparkSession, SparkContext and SqlContext objects active in both the threads are exactly the same. Also, the Spark Conf properties set as also exactly the same.&lt;/P&gt;&lt;P&gt;If I remove the parallelism by changing the method call from parallelStream() to normal sequential stream(), then everything works fine. Hence, it is evident that there is no issue with the spark/delta libraries used or connection to the blob store/file format.&lt;/P&gt;&lt;P&gt;Can someone help here to explain this behaviour and how to resolve it?&lt;/P&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Sun, 17 Dec 2023 05:12:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55362#M6274</guid>
      <dc:creator>kartik-chandra</dc:creator>
      <dc:date>2023-12-17T05:12:22Z</dc:date>
    </item>
    <item>
      <title>Re: Spark read with format as "delta" isn't working with Java multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55381#M6275</link>
      <description>&lt;P&gt;&lt;STRONG&gt;---UPDATE---&lt;/STRONG&gt;&lt;/P&gt;&lt;P&gt;I tried printing the Thread names and observe that User-1 processing happens on the same Thread as that of the main program, but, User-2 is processed on a ForkJoin common pool Thread.&lt;/P&gt;&lt;P&gt;Could it be that the Spark conf (related to Delta format) set in the MAIN thread are somehow not getting passed on to the new ForkJoinPool thread?&lt;/P&gt;&lt;PRE&gt;MAIN thread name: stream execution thread &lt;SPAN class=""&gt;for&lt;/SPAN&gt; [id = 04970c13-5d49-&lt;SPAN class=""&gt;441d&lt;/SPAN&gt;-8bce-893481eed417, runId = 6cba7397-&lt;SPAN class=""&gt;1113&lt;/SPAN&gt;-47a6-bbff-479c8ea695fc]

user-&lt;SPAN class=""&gt;1&lt;/SPAN&gt; thread name: stream execution thread &lt;SPAN class=""&gt;for&lt;/SPAN&gt; [id = 04970c13-5d49-&lt;SPAN class=""&gt;441d&lt;/SPAN&gt;-8bce-893481eed417, runId = 6cba7397-&lt;SPAN class=""&gt;1113&lt;/SPAN&gt;-47a6-bbff-479c8ea695fc]

user-&lt;SPAN class=""&gt;2&lt;/SPAN&gt; thread name: ForkJoinPool.commonPool-worker-&lt;SPAN class=""&gt;3&lt;/SPAN&gt;&lt;/PRE&gt;</description>
      <pubDate>Sun, 17 Dec 2023 15:06:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55381#M6275</guid>
      <dc:creator>kartik-chandra</dc:creator>
      <dc:date>2023-12-17T15:06:26Z</dc:date>
    </item>
    <item>
      <title>Re: Spark read with format as "delta" isn't working with Java multithreading</title>
      <link>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55397#M6276</link>
      <description>&lt;P&gt;The problem was indeed with the way ClassLoader was being set in the ForkJoinPool (common Pool used) thread.&amp;nbsp;&lt;SPAN&gt;Spark in&amp;nbsp;&lt;/SPAN&gt;SparkClassUtils&lt;SPAN&gt;&amp;nbsp;uses&amp;nbsp;&lt;/SPAN&gt;Thread.currentThread().getContextClassLoader&lt;SPAN&gt;&amp;nbsp;which might behave differently in another thread.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;To solve it I created my own&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;ForkJoinWorkerThreadFactory&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;implementation and set the ContextClassLoader using that of the Main thread. Then created my own custom ForkJoinPool to execute the parallelStream.&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;public&lt;/SPAN&gt; &lt;SPAN class=""&gt;class&lt;/SPAN&gt; &lt;SPAN class=""&gt;MyForkJoinWorkerThreadFactory&lt;/SPAN&gt; &lt;SPAN class=""&gt;implements&lt;/SPAN&gt; &lt;SPAN class=""&gt;ForkJoinWorkerThreadFactory&lt;/SPAN&gt; {

  &lt;SPAN class=""&gt;@Override&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;public&lt;/SPAN&gt; ForkJoinWorkerThread &lt;SPAN class=""&gt;newThread&lt;/SPAN&gt;&lt;SPAN class=""&gt;(ForkJoinPool pool)&lt;/SPAN&gt; {
    &lt;SPAN class=""&gt;final&lt;/SPAN&gt; &lt;SPAN class=""&gt;ForkJoinWorkerThread&lt;/SPAN&gt; &lt;SPAN class=""&gt;worker&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; ForkJoinPool.defaultForkJoinWorkerThreadFactory.newThread(pool);
    worker.setName(&lt;SPAN class=""&gt;"my-fjpworker-"&lt;/SPAN&gt; + worker.getPoolIndex());
    worker.setContextClassLoader(Thread.currentThread().getContextClassLoader());  &lt;SPAN class=""&gt;// Set the ClassLoader&lt;/SPAN&gt;
    &lt;SPAN class=""&gt;return&lt;/SPAN&gt; worker;
  }
}&lt;/PRE&gt;&lt;P&gt;Similar issue:&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://stackoverflow.com/questions/66393004/jaxb-class-not-found-with-forkjoinpool-java" target="_blank" rel="noopener"&gt;JAXB class not found with ForkJoinPool Java&lt;/A&gt;&lt;/P&gt;&lt;P&gt;Code reference:&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://stackoverflow.com/a/57551188/23113106" target="_blank" rel="noopener"&gt;https://stackoverflow.com/a/57551188/23113106&lt;/A&gt;&lt;/P&gt;&lt;P&gt;&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Sun, 17 Dec 2023 16:17:23 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/spark-read-with-format-as-quot-delta-quot-isn-t-working-with/m-p/55397#M6276</guid>
      <dc:creator>kartik-chandra</dc:creator>
      <dc:date>2023-12-17T16:17:23Z</dc:date>
    </item>
  </channel>
</rss>

