Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-08-2023 06:41 PM
Here is my code to write data from a delta table to event hub (from where consumer team will consume data):
import org.apache.spark.eventhubs._ import org.apache.spark.sql.streaming.Trigger._ import org.apache.spark.sql.types._ import org.apache.spark.sql.functions._ import java.util.Properties import com.microsoft.azure.eventhubs.{ EventData, PartitionSender } import org.apache.spark.eventhubs.EventHubsConf import io.delta.tables._ import org.apache.spark.sql.streaming.Trigger import java.io.PrintWriter import java.time.ZonedDateTime import java.time.format.DateTimeFormatter import scala.concurrent.duration._ import java.nio.file.{Paths, Files} // Configure Azure Event Hub details val namespaceNameOut = "des-ent-prod-cus-stream-eventhub-001" val eventHubNameOut = "hvc-prodstats-output" val sasKeyNameOut = "writer" val sasKeyOut = dbutils.secrets.get(scope="deskvscope", key="des-ent-prod-cus-stream-e venthub-001-writer") // Configure checkpoint and bad data paths val checkpoint_dir_path = "/mnt/hvc-wenco/prodstats/stream/checkpoints" val baddata_path = "/mnt/hvc-wenco/prodstats/stream/Bad_data" // Define timestamp checkpoint path val tbl_version_timestamp_path = "/mnt/hvc- wenco/equip_status_history_table/checkpoints/checkpoint" // Configure other parameters val MaxEvents = 5000 // Read the last checkpoint timestamp val last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path) // Parse last checkpoint timestamp val time_format = "yyyy-MM-dd HH:mm:ss.SSSz" val formatter = DateTimeFormatter.ofPattern(time_format) val last_checkpoint = ZonedDateTime.parse(last_checkpoint_string, formatter) // Build connection to Event Hub val connStrOut = new com.microsoft.azure.eventhubs.ConnectionStringBuilder() .setNamespaceName(namespaceNameOut) .setEventHubName(eventHubNameOut) .setSasKeyName(sasKeyNameOut) .setSasKey(sasKeyOut) val ehWriteConf = EventHubsConf(connStrOut.toString()) // Create a streaming dataframe from the Delta table val InputStreamingDF = spark .readStream .option("maxFilesPerTrigger", 1) .option("startingTimestamp", last_checkpoint_string) .option("readChangeFeed", "true") .table("wencohvc.equip_status_history_table") val dropPreTransform = InputStreamingDF.filter(InputStreamingDF("_change_type") =!= "update_preimage") val operationTransform = dropPreTransform.withColumn("operation", when($"_change_type" === "insert", 2).otherwise(when($"_change_type" === "update_postimage", 4))) val transformedDF = operationTransform.withColumn("DeletedIndicator", when($"_change_type" === "delete", "Y").otherwise("N")) val finalDF = transformedDF.drop("_change_type", "_commit_version", "_commit_timestamp") // Write to Event Hubs with retry and checkpointing var retry = true var retryCount = 0 val maxRetries = 3 while (retry && retryCount < maxRetries) { try { val stream = finalDF .select(to_json(struct(/* column list */)).alias("body")) .writeStream .format("eventhubs") .options(ehWriteConf.toMap) .option("checkpointLocation", checkpoint_dir_path) .trigger(Trigger.AvailableNow) .start() stream.awaitTermination() retry = false } catch { case e: Exception => retryCount += 1 if (retryCount < maxRetries) { val delay = 2.seconds * retryCount println(s"Stream attempt $retryCount failed, retrying in ${delay.toSeconds} seconds...") Thread.sleep(delay.toMillis) } } } // Write checkpoint val emptyDF = Seq((1)).toDF("seq") val checkpoint_timestamp = emptyDF.withColumn("current_timestamp", current_timestamp()).first().getTimestamp(1) + "+00:00" dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), true)
Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:
ERROR: Some streams terminated before this command could finish! Stream attempt 1 failed, retrying in 2 seconds... Stream attempt 2 failed, retrying in 4 seconds... retry: Boolean = true retryCount: Int = 3 maxRetries: Int = 3 ERROR: Some streams terminated before this command could finish! Command took 0.04 seconds