I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:
@dlt.table(
name="log_events",
comment="Finalize log events data and prepare for Delta table storage",
partition_cols=["LogSubmissionDateID"]
)
def log_events():
log_events_df = dlt.read_stream("processed_log_events").alias("events")
log_inserted_df = dlt.read_stream("logsubmission").alias("logsubmission")
join_condition = (
(col("events.Evnumber") == col("logsubmission.Evnumber")) &
(col("logsubmission.LogSubmissionDateID") == lit(f'{folder_name_currentdate}'))
)
joined_df = log_events_df.join(broadcast(log_inserted_df), join_condition, "inner")
events_df = joined_df.select(
expr("uuid()").alias("LogEventID"),
col('LogSubmissionID'),
col("events.Evnumber").alias("Evnumber"),
col("logsubmission.LogSubmissionDateID").alias("LogSubmissionDateID"),
col("events.LogEventSequence").alias("LogEventSequence"),
col("events.log_events").alias("log_events"),
col("events.log_events.Type").alias("Type"),
col("events.log_events.TimeStampDate").alias("EventTimeStamp"),
expr("CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)").alias('EventCode'),
col("events.log_events.EventSubType").alias("EventSubType")
)
return events_df
Error Message:
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.<init>(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)