How to process images and video through structured streaming using Delta Lake?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-25-2021 01:37 PM
Can we scan though videos and identify and alert in real time if something goes wrong? what are best practices for this kind of use case?
- Labels:
-
Best Practices
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
07-30-2021 08:59 AM
Yes you can use Delta Lake to process images. Typically with video processing you would process each frame of the video (which is very similar to image processing).
To do what you are saying you would either stream the image data using a service bus that can be read by Databricks OR you can drop image/video files into cloud storage and load those into a streaming dataframe.
Once you have a streaming dataframe you would likely want to use a foreach batch function to score each image/frame using your ML/DL model that you previously trained to identify any alerts that you are interested in.
Code below:
// function to upsert data from silver to gold
def upsertBatchData(microBatchDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], batchId: scala.Long) = {
// apply transformations as needed
// write the data
if (DeltaTable.isDeltaTable(gold_data)){
var deltaTable = DeltaTable.forPath(spark, gold_data) // set the delta table for upsert
(deltaTable.alias("delta_table")
.merge(microBatchDF.alias("updates"), "updates.word = delta_table.word") // join dataframe 'updates' with delta table 'delta_table' on the key
.whenMatched().updateAll() // if we match a key then we update all columns
.whenNotMatched().insertAll() // if we do not match a key then we insert all columns
.execute() )
} else {
microBatchDF.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(gold_data)
}
}
var silverDF = spark.readStream.format("delta").option("ignoreChanges", "true").load(silver_data) // Read the silver data as a stream
// write the silver data as a stream update
silverDF.writeStream
.format("delta")
.option("checkpointLocation", silver_checkpoint)
.trigger(Trigger.Once())
.foreachBatch(upsertBatchData _)
.outputMode("update")
.start()
display(spark.read.format("delta").load(gold_data))

- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-14-2021 01:23 PM
Maybe I'm a little off topic, but can you recommend companies that are engaged in video production? I want to make an explanatory video for my site.

