Streaming data from delta table to eventhub after merging data - getting timeout error!!

Ruby8376
Valued Contributor

Here is my code to write data from a delta table to event hub (from where consumer team will consume data):

import org.apache.spark.eventhubs._  
    import org.apache.spark.sql.streaming.Trigger._  
    import org.apache.spark.sql.types._  
    import org.apache.spark.sql.functions._  
    import java.util.Properties  
    import com.microsoft.azure.eventhubs.{ EventData, PartitionSender }  
    import org.apache.spark.eventhubs.EventHubsConf  
    import io.delta.tables._  
    import org.apache.spark.sql.streaming.Trigger  
    import java.io.PrintWriter  
    import java.time.ZonedDateTime  
    import java.time.format.DateTimeFormatter  
    import scala.concurrent.duration._  
    import java.nio.file.{Paths, Files}    

    // Configure Azure Event Hub details  
    val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001"  
    val eventHubNameOut = "hvc-prodstats-output"  
    val sasKeyNameOut = "writer"  
    val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer")  

    // Configure checkpoint and bad data paths
    val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints"
    val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data"

    // Define timestamp checkpoint path  
    val tbl_version_timestamp_path = "/mnt/hvc-  wenco/equip_status_history_table/checkpoints/checkpoint"  

    // Configure other parameters  
    val MaxEvents = 5000  

    // Read the last checkpoint timestamp  
    val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)  

    // Parse last checkpoint timestamp  
    val time_format = "yyyy-MM-dd HH:mm:ss.SSSz"  
    val formatter = DateTimeFormatter.ofPattern(time_format)  
    val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter)  

    // Build connection to Event Hub  
    val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()  
            .setNamespaceName(namespaceNameOut)  
            .setEventHubName(eventHubNameOut)  
            .setSasKeyName(sasKeyNameOut)  
            .setSasKey(sasKeyOut)  
    val ehWriteConf = EventHubsConf(connStrOut.toString())  

    // Create a streaming dataframe from the Delta table  
    val InputStreamingDF =   
      spark  
        .readStream  
        .option("maxFilesPerTrigger", 1)  
        .option("startingTimestamp", last_checkpoint_string)  
        .option("readChangeFeed", "true")
        .table("wencohvc.equip_status_history_table")  
    val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage")  
    val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4)))  

    val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N"))  
    val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp")  

    // Write to Event Hubs with retry and checkpointing  
    var retry = true  
    var retryCount = 0  
    val maxRetries = 3  

    while (retry && retryCount < maxRetries) {  
      try {  
        val stream = finalDF  
          .select(to_json(struct(/* column list */)).alias("body"))  
          .writeStream  
          .format("eventhubs")  
          .options(ehWriteConf.toMap)  
          .option("checkpointLocation", checkpoint_dir_path)  
          .trigger(Trigger.AvailableNow)  
          .start()  

        stream.awaitTermination()  
        retry = false  
      } catch {  
        case e: Exception =>  
          retryCount += 1  
          if (retryCount < maxRetries) {  
            val delay = 2.seconds * retryCount  
            println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...")  
            Thread.sleep(delay.toMillis)  
          }  
      }  
    }  

    // Write checkpoint  
    val emptyDF = Seq((1)).toDF("seq")  
    val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00"  
    dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)   

Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:

ERROR: Some streams terminated before this command could finish!
Stream attempt 1 failed, retrying in 2 seconds...
Stream attempt 2 failed, retrying in 4 seconds...
retry: Boolean = true
retryCount: Int = 3
maxRetries: Int = 3
ERROR: Some streams terminated before this command could finish!
Command took 0.04 seconds