08-08-2023 06:41 PM
Here is my code to write data from a delta table to event hub (from where consumer team will consume data):
import org.apache.spark.eventhubs._ import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import java.util.Properties import com.microsoft.azure.eventhubs.{ EventData, PartitionSender } import org.apache.spark.eventhubs.EventHubsConf import io.delta.tables._ import org.apache.spark.sql.streaming.Trigger import java.io.PrintWriter import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import scala.concurrent.duration._ import java.nio.file.{Paths, Files} // Configure Azure Event Hub details val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001" val eventHubNameOut = "hvc-prodstats-output" val sasKeyNameOut = "writer" val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer") // Configure checkpoint and bad data paths val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints" val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data" // Define timestamp checkpoint path val tbl_version_timestamp_path = "/mnt/hvc- wenco/equip_status_history_table/checkpoints/checkpoint" // Configure other parameters val MaxEvents = 5000 // Read the last checkpoint timestamp val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path) // Parse last checkpoint timestamp val time_format = "yyyy-MM-dd HH:mm:ss.SSSz" val formatter = DateTimeFormatter.ofPattern(time_format) val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter) // Build connection to Event Hub val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder() .setNamespaceName(namespaceNameOut) .setEventHubName(eventHubNameOut) .setSasKeyName(sasKeyNameOut) .setSasKey(sasKeyOut) val ehWriteConf = EventHubsConf(connStrOut.toString()) // Create a streaming dataframe from the Delta table val InputStreamingDF = spark .readStream .option("maxFilesPerTrigger", 1) .option("startingTimestamp", last_checkpoint_string) .option("readChangeFeed", "true") .table("wencohvc.equip_status_history_table") val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage") val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4))) val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N")) val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp") // Write to Event Hubs with retry and checkpointing var retry = true var retryCount = 0 val maxRetries = 3 while (retry && retryCount < maxRetries) { try { val stream = finalDF .select(to_json(struct(/* column list */)).alias("body")) .writeStream .format("eventhubs") .options(ehWriteConf.toMap) .option("checkpointLocation", checkpoint_dir_path) .trigger(Trigger.AvailableNow) .start() stream.awaitTermination() retry = false } catch { case e: Exception => retryCount += 1 if (retryCount < maxRetries) { val delay = 2.seconds * retryCount println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...") Thread.sleep(delay.toMillis) } } } // Write checkpoint val emptyDF = Seq((1)).toDF("seq") val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00" dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)
Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:
ERROR: Some streams terminated before this command could finish! Stream attempt 1 failed, retrying in 2 seconds... Stream attempt 2 failed, retrying in 4 seconds... retry: Boolean = true retryCount: Int = 3 maxRetries: Int = 3 ERROR: Some streams terminated before this command could finish! Command took 0.04 seconds
08-09-2023 11:54 PM
Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).
08-08-2023 06:42 PM
@Ryan_Chynoweth Can you help?
08-08-2023 06:44 PM
@hubert_dudek Can you help?
08-08-2023 06:45 PM
@Anonymous can you help?
08-09-2023 12:09 AM
My first reaction is: why the loop?
The streaming query should send all data to the sink.
08-09-2023 05:48 AM
yea initially there was no loop. I had to add it for retry mechanism so it does not times out before finishing the write operation.
08-09-2023 05:53 AM
well get rid of it and let's try to find the cause.
Can you set the maxfilespertrigger to the default (1000)?
Also check the points that Kaniz mentioned.
My feeling is that too much data is processed at once.
08-09-2023 05:57 AM
and also try with maxBytesPerTrigger
08-09-2023 09:53 AM
Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 44) (10.188.133.16 executor 0): com.microsoft.azure.eventhubs.TimeoutException: Entity(qb2-testprodstats-output): Send operation timed out at 2023-08-09T16:42:30.177Z[Etc/UTC]., errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]
at com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout(MessageSender.java:947)
at com.microsoft.azure.eventhubs.impl.MessageSender.access$1800(MessageSender.java:62)
at com.microsoft.azure.eventhubs.impl.MessageSender$SendTimeout.run(MessageSender.java:1069)
at com.microsoft.azure.eventhubs.impl.Timer$ScheduledTask.onEvent(Timer.java:48)
at com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask(DispatchHandler.java:12)
at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:233)
at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)
at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)
at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
getting this error after making the changes.
08-10-2023 07:08 AM
Thanks Kaniz!
08-09-2023 11:54 PM
Ok, the error message clearly states a timeout on the event hub side.
So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).
Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.
For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.
I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).
08-10-2023 07:07 AM
Thank you @-werners-
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group