cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream to stream join NullPointerException

TinasheChinyati
New Contributor III

I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:

@dlt.table(
    name="log_events",
    comment="Finalize log events data and prepare for Delta table storage",
    partition_cols=["LogSubmissionDateID"]
)
def log_events():
    log_events_df = dlt.read_stream("processed_log_events").alias("events")
    log_inserted_df = dlt.read_stream("logsubmission").alias("logsubmission")

    join_condition = (

        (col("events.Evnumber") == col("logsubmission.Evnumber")) &
        (col("logsubmission.LogSubmissionDateID") == lit(f'{folder_name_currentdate}'))
    )
 
    joined_df = log_events_df.join(broadcast(log_inserted_df), join_condition, "inner")

    events_df = joined_df.select(
        expr("uuid()").alias("LogEventID"),
        col('LogSubmissionID'),
        col("events.Evnumber").alias("Evnumber"),
        col("logsubmission.LogSubmissionDateID").alias("LogSubmissionDateID"),
        col("events.LogEventSequence").alias("LogEventSequence"),
        col("events.log_events").alias("log_events"),
        col("events.log_events.Type").alias("Type"),
        col("events.log_events.TimeStampDate").alias("EventTimeStamp"),
        expr("CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)").alias('EventCode'),
        col("events.log_events.EventSubType").alias("EventSubType")
    )
   
    return events_df
 
Error Message:
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.<init>(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
0 REPLIES 0

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now