cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

Stream to stream join NullPointerException

TinasheChinyati
New Contributor III

I have a DLT pipeline running in continous mode. I have a stream to stream join which runs for the first 5hrs but then fails with a Null Pointer Exception. I need assistance to know what I need to do to handle this. my code is structured as below:

@dlt.table(
    name="log_events",
    comment="Finalize log events data and prepare for Delta table storage",
    partition_cols=["LogSubmissionDateID"]
)
def log_events():
    log_events_df = dlt.read_stream("processed_log_events").alias("events")
    log_inserted_df = dlt.read_stream("logsubmission").alias("logsubmission")

    join_condition = (

        (col("events.Evnumber") == col("logsubmission.Evnumber")) &
        (col("logsubmission.LogSubmissionDateID") == lit(f'{folder_name_currentdate}'))
    )
 
    joined_df = log_events_df.join(broadcast(log_inserted_df), join_condition, "inner")

    events_df = joined_df.select(
        expr("uuid()").alias("LogEventID"),
        col('LogSubmissionID'),
        col("events.Evnumber").alias("Evnumber"),
        col("logsubmission.LogSubmissionDateID").alias("LogSubmissionDateID"),
        col("events.LogEventSequence").alias("LogEventSequence"),
        col("events.log_events").alias("log_events"),
        col("events.log_events.Type").alias("Type"),
        col("events.log_events.TimeStampDate").alias("EventTimeStamp"),
        expr("CAST(CONV(substr(events.log_events.EventCode, 3), 16, 10) AS INT)").alias('EventCode'),
        col("events.log_events.EventSubType").alias("EventSubType")
    )
   
    return events_df
 
Error Message:
org.apache.spark.sql.streaming.StreamingQueryException: [STREAM_FAILED] Query [id = 88a8f9a0-aaec-4f01-b6d2-3c73767e38bf, runId = d81b0bb2-8e75-416f-b99c-aab3fca22bf1] terminated with exception: Exception thrown in awaitResult: Job aborted due to stage failure: Task 198 in stage 342551.0 failed 4 times, most recent failure: Lost task 198.3 in stage 342551.0 (TID 1828492) (10.139.64.4 executor 5): java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.writeFields_0_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueRowConverterFormatV2.convertToValueRow(SymmetricHashJoinStateManager.scala:595) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager$KeyWithIndexToValueStore.put(SymmetricHashJoinStateManager.scala:665) at org.apache.spark.sql.execution.streaming.state.SymmetricHashJoinStateManager.$anonfun$getJoinedRows$2(SymmetricHashJoinStateManager.scala:124) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner$AddingProcessedRowToStateCompletionIterator.<init>(StreamingSymmetricHashJoinExec.scala:681) at org.apache.spark.sql.execution.streaming.StreamingSymmetricHashJoinExec$OneSideHashJoiner.$anonfun$storeAndJoinWithOtherSide$8(StreamingSymmetricHashJoinExec.scala:668) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$ConcatIterator.advance(Iterator.scala:199) at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:227) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:31) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage3.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.shuffle.sort.UnsafeShuffleWriter.write(UnsafeShuffleWriter.java:195) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:57) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$3(ShuffleMapTask.scala:92) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.$anonfun$runTask$1(ShuffleMapTask.scala:87) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:58) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:39) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:196) at org.apache.spark.scheduler.Task.doRunTask(Task.scala:181) at org.apache.spark.scheduler.Task.$anonfun$run$5(Task.scala:146) at com.databricks.unity.EmptyHandle$.runWithAndClose(UCSHandle.scala:129) at org.apache.spark.scheduler.Task.$anonfun$run$1(Task.scala:146) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.scheduler.Task.run(Task.scala:99) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$8(Executor.scala:932) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:102) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:935) at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23) at com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:827) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group