cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to process images and video through structured streaming using Delta Lake?

Srikanth_Gupta_
Valued Contributor

Can we scan though videos and identify and alert in real time if something goes wrong? what are best practices for this kind of use case?

2 REPLIES 2

Ryan_Chynoweth
Honored Contributor III

Yes you can use Delta Lake to process images. Typically with video processing you would process each frame of the video (which is very similar to image processing).

To do what you are saying you would either stream the image data using a service bus that can be read by Databricks OR you can drop image/video files into cloud storage and load those into a streaming dataframe.

Once you have a streaming dataframe you would likely want to use a foreach batch function to score each image/frame using your ML/DL model that you previously trained to identify any alerts that you are interested in.

Code below:

// function to upsert data from silver to gold
def upsertBatchData(microBatchDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], batchId: scala.Long) = {
    // apply transformations as needed
 
    // write the data
    if (DeltaTable.isDeltaTable(gold_data)){
        var deltaTable = DeltaTable.forPath(spark, gold_data) // set the delta table for upsert
 
        (deltaTable.alias("delta_table")
        .merge(microBatchDF.alias("updates"), "updates.word = delta_table.word") // join dataframe 'updates' with delta table 'delta_table' on the key
        .whenMatched().updateAll() // if we match a key then we update all columns
        .whenNotMatched().insertAll() // if we do not match a key then we insert all columns
        .execute() )
    } else {
        microBatchDF.write.format("delta").mode("overwrite").option("mergeSchema", "true").save(gold_data)
    }
  
  
}
 
 
var silverDF = spark.readStream.format("delta").option("ignoreChanges", "true").load(silver_data) // Read the silver data as a stream
 
 
// write the silver data as a stream update
silverDF.writeStream
    .format("delta")
    .option("checkpointLocation", silver_checkpoint)
    .trigger(Trigger.Once())
    .foreachBatch(upsertBatchData _)
    .outputMode("update")
    .start()
 
display(spark.read.format("delta").load(gold_data))

Anonymous
Not applicable

Maybe I'm a little off topic, but can you recommend companies that are engaged in video production? I want to make an explanatory video for my site.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.