cancel
Showing results for 
Search instead for 
Did you mean: 
Community Discussions
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream to stream join NullPointerException

TinasheChinyati
New Contributor

I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:

@dlt.table(
    name="log_events",
    comment="Finalize log events data and prepare for Delta table storage",
    partition_cols=["LogSubmissionDateID"]
)
def log_events():
    log_events_df = dlt.read_stream("processed_log_events").alias("events")
    log_inserted_df = dlt.read_stream("logsubmission").alias("logsubmission")

    join_condition = (

        (col("events.Evnumber") == col("logsubmission.Evnumber")) &
        (col("logsubmission.LogSubmissionDateID") == lit(f'{folder_name_currentdate}'))
    )
 
    joined_df = log_events_df.join(broadcast(log_inserted_df), join_condition, "inner")

    events_df = joined_df.select(
        expr("uuid()").alias("LogEventID"),
        col('LogSubmissionID'),
        col("events.Evnumber").alias("Evnumber"),
        col("logsubmission.LogSubmissionDateID").alias("LogSubmissionDateID"),
        col("events.LogEventSequence").alias("LogEventSequence"),
        col("events.log_events").alias("log_events"),
        col("events.log_events.Type").alias("Type"),
        col("events.log_events.TimeStampDate").alias("EventTimeStamp"),
        expr("CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)").alias('EventCode'),
        col("events.log_events.EventSubType").alias("EventSubType")
    )
   
    return events_df
 
Error Message:
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.<init>(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @TinasheChinyati, It looks like you’re encountering a Null Pointer Exception in your DLT pipeline when performing a stream-to-stream join.

Let’s break down the issue and explore potential solutions:

  • The error message indicates that the query terminated due to a NullPointerException.
  • The stack trace shows that the exception occurred during a stage execution, specifically in a task related to a stream-to-stream join operation.
  • A NullPointerException typically occurs when you’re trying to access or manipulate an object that is null.
  • In your case, it might be related to the join condition or the data being processed.
  • Let’s investigate the following areas:
    • Join Condition: Verify that the join condition is correctly defined. Ensure that both log_events_df and log_inserted_df have non-null values for the columns used in the join.
    • Data Quality: Check if there are any null values in the relevant columns (Evnumber, LogSubmissionDateID, etc.) within the data streams.
    • Broadcast Join: You’re using a broadcast join. Make sure that the smaller DataFrame (usually log_inserted_df) is not empty or null.
    • Column Aliases: Confirm that the column aliases (alias("events") and alias("logsubmission")) are applied correctly and consistently throughout the code.
    • Suggestions:
      • Consider adding explicit checks for null values in your join condition.
      • Validate the data quality and ensure that the necessary columns are not null.
      • If possible, log intermediate results to help identify where the null values are coming from.
      • Review the logic for calculating folder_name_currentdate to ensure it’s correctly set.

Remember to thoroughly check your data and join conditions to pinpoint the root cause of the issue. If you need further assistance, feel free to ask! 😊