08-08-2023 06:41 PM
Here is my code to write data from a delta table to event hub (from where consumer team will consume data):
import org.apache.spark.eventhubs._ import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import java.util.Properties import com.microsoft.azure.eventhubs.{ EventData, PartitionSender } import org.apache.spark.eventhubs.EventHubsConf import io.delta.tables._ import org.apache.spark.sql.streaming.Trigger import java.io.PrintWriter import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import scala.concurrent.duration._ import java.nio.file.{Paths, Files} // Configure Azure Event Hub details val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001" val eventHubNameOut = "hvc-prodstats-output" val sasKeyNameOut = "writer" val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer") // Configure checkpoint and bad data paths val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints" val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data" // Define timestamp checkpoint path val tbl_version_timestamp_path = "/mnt/hvc- wenco/equip_status_history_table/checkpoints/checkpoint" // Configure other parameters val MaxEvents = 5000 // Read the last checkpoint timestamp val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path) // Parse last checkpoint timestamp val time_format = "yyyy-MM-dd HH:mm:ss.SSSz" val formatter = DateTimeFormatter.ofPattern(time_format) val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter) // Build connection to Event Hub val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder() .setNamespaceName(namespaceNameOut) .setEventHubName(eventHubNameOut) .setSasKeyName(sasKeyNameOut) .setSasKey(sasKeyOut) val ehWriteConf = EventHubsConf(connStrOut.toString()) // Create a streaming dataframe from the Delta table val InputStreamingDF = spark .readStream .option("maxFilesPerTrigger", 1) .option("startingTimestamp", last_checkpoint_string) .option("readChangeFeed", "true") .table("wencohvc.equip_status_history_table") val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage") val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4))) val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N")) val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp") // Write to Event Hubs with retry and checkpointing var retry = true var retryCount = 0 val maxRetries = 3 while (retry && retryCount < maxRetries) { try { val stream = finalDF .select(to_json(struct(/* column list */)).alias("body")) .writeStream .format("eventhubs") .options(ehWriteConf.toMap) .option("checkpointLocation", checkpoint_dir_path) .trigger(Trigger.AvailableNow) .start() stream.awaitTermination() retry = false } catch { case e: Exception => retryCount += 1 if (retryCount < maxRetries) { val delay = 2.seconds * retryCount println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...") Thread.sleep(delay.toMillis) } } } // Write checkpoint val emptyDF = Seq((1)).toDF("seq") val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00" dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)
Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:
ERROR: Some streams terminated before this command could finish! Stream attempt 1 failed, retrying in 2 seconds... Stream attempt 2 failed, retrying in 4 seconds... retry: Boolean = true retryCount: Int = 3 maxRetries: Int = 3 ERROR: Some streams terminated before this command could finish! Command took 0.04 seconds
08-09-2023 12:27 AM
Hi @Ruby8376, The issue you are facing is that the streaming job is timing out before the last command, and the previous checkpoint command does not run.
To solve this issue, try the following steps:
1. Increase the timeout duration of the streaming job by setting the spark.sql.streaming.stopTimeout
configuration property. You can set it to a higher value, such as 600s
(10 minutes), to allow more time for the job to complete.
2. Check the resources allocated to the streaming job. If the job runs out of resources, such as memory or CPU, it may be causing the timeout. You can try increasing the resources allocated to the job, such as the number of executors or the memory per executor, to see if it resolves the issue.
3. Verify that the checkpoint directory and destructive data paths are accessible and have the necessary permissions. If there are any issues with accessing or writing to these paths, it can cause the job to fail or time out. Ensure that the ways exist, and the user running the job has the necessary permissions to read from and write to these paths.
4. Check the network connectivity between your Databricks cluster and the Azure Event Hub. If there are any network issues or connectivity problems, it can cause the streaming job to fail or time out. Ensure that the cluster has access to the Event Hub and that no firewall or network restrictions block the connection.
5. Review the logs and error messages generated by the streaming job to identify any specific errors or issues causing the timeout. Your error message indicates that some streams terminated before the command could finish, but it doesn't provide specific details about the cause. Analyzing the logs and error notes can help you pinpoint the exact issue and take appropriate actions to resolve it.
Sources:
- Docs: event-hubs
08-09-2023 11:54 PM
Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).
08-08-2023 06:42 PM
@Ryan_Chynoweth Can you help?
08-08-2023 06:44 PM
@hubert_dudek Can you help?
08-08-2023 06:45 PM
@WernerS can you help?
08-09-2023 12:09 AM
My first reaction is: why the loop?
The streaming query should send all data to the sink.
08-09-2023 05:48 AM
yea initially there was no loop. I had to add it for retry mechanism so it does not times out before finishing the write operation.
08-09-2023 05:53 AM
well get rid of it and let's try to find the cause.
Can you set the maxfilespertrigger to the default (1000)?
Also check the points that Kaniz mentioned.
My feeling is that too much data is processed at once.
08-09-2023 05:57 AM
and also try with maxBytesPerTrigger
08-09-2023 09:53 AM
Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 44) (10.188.133.16 executor 0): com.microsoft.azure.eventhubs.TimeoutException: Entity(qb2-testprodstats-output): Send operation timed out at 2023-08-09T16:42:30.177Z[Etc/UTC]., errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]
at com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout(MessageSender.java:947)
at com.microsoft.azure.eventhubs.impl.MessageSender.access$1800(MessageSender.java:62)
at com.microsoft.azure.eventhubs.impl.MessageSender$SendTimeout.run(MessageSender.java:1069)
at com.microsoft.azure.eventhubs.impl.Timer$ScheduledTask.onEvent(Timer.java:48)
at com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask(DispatchHandler.java:12)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:233)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
getting this error after making the changes.
08-09-2023 12:27 AM
Hi @Ruby8376, The issue you are facing is that the streaming job is timing out before the last command, and the previous checkpoint command does not run.
To solve this issue, try the following steps:
1. Increase the timeout duration of the streaming job by setting the spark.sql.streaming.stopTimeout
configuration property. You can set it to a higher value, such as 600s
(10 minutes), to allow more time for the job to complete.
2. Check the resources allocated to the streaming job. If the job runs out of resources, such as memory or CPU, it may be causing the timeout. You can try increasing the resources allocated to the job, such as the number of executors or the memory per executor, to see if it resolves the issue.
3. Verify that the checkpoint directory and destructive data paths are accessible and have the necessary permissions. If there are any issues with accessing or writing to these paths, it can cause the job to fail or time out. Ensure that the ways exist, and the user running the job has the necessary permissions to read from and write to these paths.
4. Check the network connectivity between your Databricks cluster and the Azure Event Hub. If there are any network issues or connectivity problems, it can cause the streaming job to fail or time out. Ensure that the cluster has access to the Event Hub and that no firewall or network restrictions block the connection.
5. Review the logs and error messages generated by the streaming job to identify any specific errors or issues causing the timeout. Your error message indicates that some streams terminated before the command could finish, but it doesn't provide specific details about the cause. Analyzing the logs and error notes can help you pinpoint the exact issue and take appropriate actions to resolve it.
Sources:
- Docs: event-hubs
08-09-2023 10:29 PM
Hi @Ruby8376, Help us build a vibrant and resourceful community by recognizing and highlighting insightful contributions. Please accept the solution and show your appreciation!
08-10-2023 07:08 AM
Thanks Kaniz!
08-09-2023 11:54 PM
Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).
08-10-2023 07:07 AM
Thank you @-werners-
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.