<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Stream to stream join NullPointerException in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/stream-to-stream-join-nullpointerexception/m-p/68082#M7204</link>
    <description>&lt;P&gt;I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"log_events"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;comment&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"Finalize log events data and prepare for Delta table storage"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;partition_cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;SPAN&gt;"LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;]&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;log_events&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; log_events_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dlt.&lt;/SPAN&gt;&lt;SPAN&gt;read_stream&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"processed_log_events"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; log_inserted_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dlt.&lt;/SPAN&gt;&lt;SPAN&gt;read_stream&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; join_condition &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;==&lt;/SPAN&gt; &lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;==&lt;/SPAN&gt; &lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;folder_name_currentdate&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; joined_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; log_events_df.&lt;/SPAN&gt;&lt;SPAN&gt;join&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;broadcast&lt;/SPAN&gt;&lt;SPAN&gt;(log_inserted_df), join_condition, &lt;/SPAN&gt;&lt;SPAN&gt;"inner"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; events_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; joined_df.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;expr&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"uuid()"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogEventID"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'LogSubmissionID'&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.LogEventSequence"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogEventSequence"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"log_events"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.Type"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Type"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.TimeStampDate"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"EventTimeStamp"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;expr&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'EventCode'&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.EventSubType"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"EventSubType"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; events_df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Error Message:&lt;BR /&gt;org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.&amp;lt;init&amp;gt;(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
    <pubDate>Sat, 04 May 2024 06:26:35 GMT</pubDate>
    <dc:creator>TinasheChinyati</dc:creator>
    <dc:date>2024-05-04T06:26:35Z</dc:date>
    <item>
      <title>Stream to stream join NullPointerException</title>
      <link>https://community.databricks.com/t5/get-started-discussions/stream-to-stream-join-nullpointerexception/m-p/68082#M7204</link>
      <description>&lt;P&gt;I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:&lt;/P&gt;&lt;DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;@&lt;/SPAN&gt;&lt;SPAN&gt;dlt&lt;/SPAN&gt;&lt;SPAN&gt;.&lt;/SPAN&gt;&lt;SPAN&gt;table&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;name&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"log_events"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;comment&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;"Finalize log events data and prepare for Delta table storage"&lt;/SPAN&gt;&lt;SPAN&gt;,&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;partition_cols&lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt;[&lt;/SPAN&gt;&lt;SPAN&gt;"LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;]&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;def&lt;/SPAN&gt; &lt;SPAN&gt;log_events&lt;/SPAN&gt;&lt;SPAN&gt;():&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; log_events_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dlt.&lt;/SPAN&gt;&lt;SPAN&gt;read_stream&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"processed_log_events"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; log_inserted_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; dlt.&lt;/SPAN&gt;&lt;SPAN&gt;read_stream&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; join_condition &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; (&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;==&lt;/SPAN&gt; &lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;)) &lt;/SPAN&gt;&lt;SPAN&gt;&amp;amp;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; (&lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;==&lt;/SPAN&gt; &lt;SPAN&gt;lit&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;f&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;{&lt;/SPAN&gt;&lt;SPAN&gt;folder_name_currentdate&lt;/SPAN&gt;&lt;SPAN&gt;}&lt;/SPAN&gt;&lt;SPAN&gt;'&lt;/SPAN&gt;&lt;SPAN&gt;))&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; joined_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; log_events_df.&lt;/SPAN&gt;&lt;SPAN&gt;join&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;broadcast&lt;/SPAN&gt;&lt;SPAN&gt;(log_inserted_df), join_condition, &lt;/SPAN&gt;&lt;SPAN&gt;"inner"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;BR /&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; events_df &lt;/SPAN&gt;&lt;SPAN&gt;=&lt;/SPAN&gt;&lt;SPAN&gt; joined_df.&lt;/SPAN&gt;&lt;SPAN&gt;select&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;expr&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"uuid()"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogEventID"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'LogSubmissionID'&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Evnumber"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"logsubmission.LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogSubmissionDateID"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.LogEventSequence"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"LogEventSequence"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"log_events"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.Type"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"Type"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.TimeStampDate"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"EventTimeStamp"&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;expr&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;'EventCode'&lt;/SPAN&gt;&lt;SPAN&gt;),&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;col&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"events.log_events.EventSubType"&lt;/SPAN&gt;&lt;SPAN&gt;).&lt;/SPAN&gt;&lt;SPAN&gt;alias&lt;/SPAN&gt;&lt;SPAN&gt;(&lt;/SPAN&gt;&lt;SPAN&gt;"EventSubType"&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; )&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;&amp;nbsp; &amp;nbsp; &lt;/SPAN&gt;&lt;SPAN&gt;return&lt;/SPAN&gt;&lt;SPAN&gt; events_df&lt;/SPAN&gt;&lt;/DIV&gt;&lt;DIV&gt;&amp;nbsp;&lt;/DIV&gt;&lt;DIV&gt;&lt;SPAN&gt;Error Message:&lt;BR /&gt;org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.&amp;lt;init&amp;gt;(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)&lt;BR /&gt;&lt;/SPAN&gt;&lt;/DIV&gt;&lt;/DIV&gt;</description>
      <pubDate>Sat, 04 May 2024 06:26:35 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/stream-to-stream-join-nullpointerexception/m-p/68082#M7204</guid>
      <dc:creator>TinasheChinyati</dc:creator>
      <dc:date>2024-05-04T06:26:35Z</dc:date>
    </item>
  </channel>
</rss>

