cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming data from delta table to eventhub after merging data - getting timeout error!!

Ruby8376
Valued Contributor

Here is my code to write data from a delta table to event hub (from where consumer team will consume data):

import org.apache.spark.eventhubs._  
    import org.apache.spark.sql.streaming.Trigger._  
    import org.apache.spark.sql.types._  
    import org.apache.spark.sql.functions._  
    import java.util.Properties  
    import com.microsoft.azure.eventhubs.{ EventData, PartitionSender }  
    import org.apache.spark.eventhubs.EventHubsConf  
    import io.delta.tables._  
    import org.apache.spark.sql.streaming.Trigger  
    import java.io.PrintWriter  
    import java.time.ZonedDateTime  
    import java.time.format.DateTimeFormatter  
    import scala.concurrent.duration._  
    import java.nio.file.{Paths, Files}    

    // Configure Azure Event Hub details  
    val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001"  
    val eventHubNameOut = "hvc-prodstats-output"  
    val sasKeyNameOut = "writer"  
    val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer")  

    // Configure checkpoint and bad data paths
    val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints"
    val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data"

    // Define timestamp checkpoint path  
    val tbl_version_timestamp_path = "/mnt/hvc-  wenco/equip_status_history_table/checkpoints/checkpoint"  

    // Configure other parameters  
    val MaxEvents = 5000  

    // Read the last checkpoint timestamp  
    val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)  

    // Parse last checkpoint timestamp  
    val time_format = "yyyy-MM-dd HH:mm:ss.SSSz"  
    val formatter = DateTimeFormatter.ofPattern(time_format)  
    val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter)  

    // Build connection to Event Hub  
    val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()  
            .setNamespaceName(namespaceNameOut)  
            .setEventHubName(eventHubNameOut)  
            .setSasKeyName(sasKeyNameOut)  
            .setSasKey(sasKeyOut)  
    val ehWriteConf = EventHubsConf(connStrOut.toString())  

    // Create a streaming dataframe from the Delta table  
    val InputStreamingDF =   
      spark  
        .readStream  
        .option("maxFilesPerTrigger", 1)  
        .option("startingTimestamp", last_checkpoint_string)  
        .option("readChangeFeed", "true")
        .table("wencohvc.equip_status_history_table")  
    val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage")  
    val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4)))  

    val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N"))  
    val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp")  

    // Write to Event Hubs with retry and checkpointing  
    var retry = true  
    var retryCount = 0  
    val maxRetries = 3  

    while (retry && retryCount < maxRetries) {  
      try {  
        val stream = finalDF  
          .select(to_json(struct(/* column list */)).alias("body"))  
          .writeStream  
          .format("eventhubs")  
          .options(ehWriteConf.toMap)  
          .option("checkpointLocation", checkpoint_dir_path)  
          .trigger(Trigger.AvailableNow)  
          .start()  

        stream.awaitTermination()  
        retry = false  
      } catch {  
        case e: Exception =>  
          retryCount += 1  
          if (retryCount < maxRetries) {  
            val delay = 2.seconds * retryCount  
            println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...")  
            Thread.sleep(delay.toMillis)  
          }  
      }  
    }  

    // Write checkpoint  
    val emptyDF = Seq((1)).toDF("seq")  
    val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00"  
    dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)   

Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:

ERROR: Some streams terminated before this command could finish!
Stream attempt 1 failed, retrying in 2 seconds...
Stream attempt 2 failed, retrying in 4 seconds...
retry: Boolean = true
retryCount: Int = 3
maxRetries: Int = 3
ERROR: Some streams terminated before this command could finish!
Command took 0.04 seconds

 

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).

event hub config for spark streaming 

View solution in original post

11 REPLIES 11

Ruby8376
Valued Contributor

@Ryan_Chynoweth Can you help?

Ruby8376
Valued Contributor

@hubert_dudek Can you help?

Ruby8376
Valued Contributor

@Anonymous can you help?

-werners-
Esteemed Contributor III

My first reaction is: why the loop?
The streaming query should send all data to the sink.

Ruby8376
Valued Contributor

yea initially there was no loop. I had to add it for retry mechanism so it does not times out before finishing the write operation.

-werners-
Esteemed Contributor III

well get rid of it and let's try to find the cause.

Can you set the maxfilespertrigger to the default (1000)?

Also check the points that Kaniz mentioned.

My feeling is that too much data is processed at once.

-werners-
Esteemed Contributor III

and also try with maxBytesPerTrigger

 

Ruby8376
Valued Contributor

Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 44) (10.188.133.16 executor 0): com.microsoft.azure.eventhubs.TimeoutException: Entity(qb2-testprodstats-output): Send operation timed out at 2023-08-09T16:42:30.177Z[Etc/UTC]., errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]
at com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout(MessageSender.java:947)
at com.microsoft.azure.eventhubs.impl.MessageSender.access$1800(MessageSender.java:62)
at com.microsoft.azure.eventhubs.impl.MessageSender$SendTimeout.run(MessageSender.java:1069)
at com.microsoft.azure.eventhubs.impl.Timer$ScheduledTask.onEvent(Timer.java:48)
at com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask(DispatchHandler.java:12)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:233)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

getting this error after making the changes.

Thanks Kaniz!

-werners-
Esteemed Contributor III

Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).

event hub config for spark streaming 

Ruby8376
Valued Contributor

Thank you @-werners- 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group