cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Streaming data from delta table to eventhub after merging data - getting timeout error!!

Ruby8376
Valued Contributor

Here is my code to write data from a delta table to event hub (from where consumer team will consume data):

import org.apache.spark.eventhubs._  
    import org.apache.spark.sql.streaming.Trigger._  
    import org.apache.spark.sql.types._  
    import org.apache.spark.sql.functions._  
    import java.util.Properties  
    import com.microsoft.azure.eventhubs.{ EventData, PartitionSender }  
    import org.apache.spark.eventhubs.EventHubsConf  
    import io.delta.tables._  
    import org.apache.spark.sql.streaming.Trigger  
    import java.io.PrintWriter  
    import java.time.ZonedDateTime  
    import java.time.format.DateTimeFormatter  
    import scala.concurrent.duration._  
    import java.nio.file.{Paths, Files}    

    // Configure Azure Event Hub details  
    val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001"  
    val eventHubNameOut = "hvc-prodstats-output"  
    val sasKeyNameOut = "writer"  
    val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer")  

    // Configure checkpoint and bad data paths
    val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints"
    val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data"

    // Define timestamp checkpoint path  
    val tbl_version_timestamp_path = "/mnt/hvc-  wenco/equip_status_history_table/checkpoints/checkpoint"  

    // Configure other parameters  
    val MaxEvents = 5000  

    // Read the last checkpoint timestamp  
    val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)  

    // Parse last checkpoint timestamp  
    val time_format = "yyyy-MM-dd HH:mm:ss.SSSz"  
    val formatter = DateTimeFormatter.ofPattern(time_format)  
    val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter)  

    // Build connection to Event Hub  
    val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder()  
            .setNamespaceName(namespaceNameOut)  
            .setEventHubName(eventHubNameOut)  
            .setSasKeyName(sasKeyNameOut)  
            .setSasKey(sasKeyOut)  
    val ehWriteConf = EventHubsConf(connStrOut.toString())  

    // Create a streaming dataframe from the Delta table  
    val InputStreamingDF =   
      spark  
        .readStream  
        .option("maxFilesPerTrigger", 1)  
        .option("startingTimestamp", last_checkpoint_string)  
        .option("readChangeFeed", "true")
        .table("wencohvc.equip_status_history_table")  
    val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage")  
    val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4)))  

    val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N"))  
    val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp")  

    // Write to Event Hubs with retry and checkpointing  
    var retry = true  
    var retryCount = 0  
    val maxRetries = 3  

    while (retry && retryCount < maxRetries) {  
      try {  
        val stream = finalDF  
          .select(to_json(struct(/* column list */)).alias("body"))  
          .writeStream  
          .format("eventhubs")  
          .options(ehWriteConf.toMap)  
          .option("checkpointLocation", checkpoint_dir_path)  
          .trigger(Trigger.AvailableNow)  
          .start()  

        stream.awaitTermination()  
        retry = false  
      } catch {  
        case e: Exception =>  
          retryCount += 1  
          if (retryCount < maxRetries) {  
            val delay = 2.seconds * retryCount  
            println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...")  
            Thread.sleep(delay.toMillis)  
          }  
      }  
    }  

    // Write checkpoint  
    val emptyDF = Seq((1)).toDF("seq")  
    val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00"  
    dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)   

Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:

ERROR: Some streams terminated before this command could finish!
Stream attempt 1 failed, retrying in 2 seconds...
Stream attempt 2 failed, retrying in 4 seconds...
retry: Boolean = true
retryCount: Int = 3
maxRetries: Int = 3
ERROR: Some streams terminated before this command could finish!
Command took 0.04 seconds

 

2 ACCEPTED SOLUTIONS

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @Ruby8376The issue you are facing is that the streaming job is timing out before the last command, and the previous checkpoint command does not run.

To solve this issue, try the following steps:

1. Increase the timeout duration of the streaming job by setting the spark.sql.streaming.stopTimeout configuration property. You can set it to a higher value, such as 600s (10 minutes), to allow more time for the job to complete.

2. Check the resources allocated to the streaming job. If the job runs out of resources, such as memory or CPU, it may be causing the timeout. You can try increasing the resources allocated to the job, such as the number of executors or the memory per executor, to see if it resolves the issue.

3. Verify that the checkpoint directory and destructive data paths are accessible and have the necessary permissions. If there are any issues with accessing or writing to these paths, it can cause the job to fail or time out. Ensure that the ways exist, and the user running the job has the necessary permissions to read from and write to these paths.

4. Check the network connectivity between your Databricks cluster and the Azure Event Hub. If there are any network issues or connectivity problems, it can cause the streaming job to fail or time out. Ensure that the cluster has access to the Event Hub and that no firewall or network restrictions block the connection.

5. Review the logs and error messages generated by the streaming job to identify any specific errors or issues causing the timeout. Your error message indicates that some streams terminated before the command could finish, but it doesn't provide specific details about the cause. Analyzing the logs and error notes can help you pinpoint the exact issue and take appropriate actions to resolve it.

Sources:
Docs: event-hubs

View solution in original post

-werners-
Esteemed Contributor III

Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).

event hub config for spark streaming 

View solution in original post

13 REPLIES 13

Ruby8376
Valued Contributor

@Ryan_Chynoweth Can you help?

Ruby8376
Valued Contributor

@hubert_dudek Can you help?

Ruby8376
Valued Contributor

@WernerS can you help?

-werners-
Esteemed Contributor III

My first reaction is: why the loop?
The streaming query should send all data to the sink.

Ruby8376
Valued Contributor

yea initially there was no loop. I had to add it for retry mechanism so it does not times out before finishing the write operation.

-werners-
Esteemed Contributor III

well get rid of it and let's try to find the cause.

Can you set the maxfilespertrigger to the default (1000)?

Also check the points that Kaniz mentioned.

My feeling is that too much data is processed at once.

-werners-
Esteemed Contributor III

and also try with maxBytesPerTrigger

 

Ruby8376
Valued Contributor

Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 44) (10.188.133.16 executor 0): com.microsoft.azure.eventhubs.TimeoutException: Entity(qb2-testprodstats-output): Send operation timed out at 2023-08-09T16:42:30.177Z[Etc/UTC]., errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]
at com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout(MessageSender.java:947)
at com.microsoft.azure.eventhubs.impl.MessageSender.access$1800(MessageSender.java:62)
at com.microsoft.azure.eventhubs.impl.MessageSender$SendTimeout.run(MessageSender.java:1069)
at com.microsoft.azure.eventhubs.impl.Timer$ScheduledTask.onEvent(Timer.java:48)
at com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask(DispatchHandler.java:12)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:233)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)

getting this error after making the changes.

Kaniz
Community Manager
Community Manager

Hi @Ruby8376The issue you are facing is that the streaming job is timing out before the last command, and the previous checkpoint command does not run.

To solve this issue, try the following steps:

1. Increase the timeout duration of the streaming job by setting the spark.sql.streaming.stopTimeout configuration property. You can set it to a higher value, such as 600s (10 minutes), to allow more time for the job to complete.

2. Check the resources allocated to the streaming job. If the job runs out of resources, such as memory or CPU, it may be causing the timeout. You can try increasing the resources allocated to the job, such as the number of executors or the memory per executor, to see if it resolves the issue.

3. Verify that the checkpoint directory and destructive data paths are accessible and have the necessary permissions. If there are any issues with accessing or writing to these paths, it can cause the job to fail or time out. Ensure that the ways exist, and the user running the job has the necessary permissions to read from and write to these paths.

4. Check the network connectivity between your Databricks cluster and the Azure Event Hub. If there are any network issues or connectivity problems, it can cause the streaming job to fail or time out. Ensure that the cluster has access to the Event Hub and that no firewall or network restrictions block the connection.

5. Review the logs and error messages generated by the streaming job to identify any specific errors or issues causing the timeout. Your error message indicates that some streams terminated before the command could finish, but it doesn't provide specific details about the cause. Analyzing the logs and error notes can help you pinpoint the exact issue and take appropriate actions to resolve it.

Sources:
Docs: event-hubs

Kaniz
Community Manager
Community Manager

Hi @Ruby8376, Help us build a vibrant and resourceful community by recognizing and highlighting insightful contributions. Please accept the solution and show your appreciation!

Ruby8376
Valued Contributor

Thanks Kaniz!

-werners-
Esteemed Contributor III

Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).

event hub config for spark streaming 

Ruby8376
Valued Contributor

Thank you @-werners- 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.