IoT (Internet-of-Things) devices are disrupting the manufacturing industry. While estimates vary, most sources agree that there are tens of billions of IoT devices currently operating in manufacturing in 2025. As advantageous as this is to being able to monitor every aspect of operations, all this data presents a challenge around how to ingest such a volume of data efficiently. Traditional approaches, such as using UNION, are inflexible, require frequent maintenance, and struggle keeping up with large volumes of high-velocity data. What is needed is a way to ingest this data quickly, and unify it so that it can be analyzed and acted upon in a timely manner.
Enter append flows.
Prior to the addition of append flows, the typical approach has been to perform a UNION within the SQL code, joining several streams of data. While that technically worked, it had several disadvantages, including the need for the sink table to be fully refreshed.
Ultimately, append flows make possible what is envisioned with the creation of Delta Lake. Delta Lake was designed to enable concurrent append and read operations. That said, until the append flow capability came out, there was no native way to do this using Delta Live Tables.
With append flows, Lakeflow Declarative Pipelines provides a simple way to combine multiple streams of data into one single streaming table. This capability is ideal for any use case where data from multiple sources needs to be combined into a single object.
When combining multiple data sources into one sink, using append flows is better in many ways than using the union operator.
Let’s create a very basic example that reads JSON files from a series of separate folders, and, using append flows, combines them into one streaming table.
First, we’re going to start by creating a new volume in Unity Catalog.
In this case, I’m going to use “iot_data” for my volume, but you can name it whatever you want. You can create this as a managed volume, or as an external volume. Either one will work with append flows.
Now that we have the volume created, let’s add some data. In the real world, of course, you’d need to address concerns about throughput and security, but for the purposes of showcasing append flows in this blog, we’re going to assume you’ve solved those issues and you have your data being added to a volume in Databricks already:
import time, uuid, json, random
from datetime import datetime
# Create subdirectories for each sensor
base_path = "/Volumes/main/default/iot_data"
# Generate folders for each of the IoT devices we're working with.
for device_id in range(5):
try:
dbutils.fs.ls(f"{base_path}/{device_id}")
except:
dbutils.fs.mkdirs(f"{base_path}/{device_id}")
# Function to generate a DataFrame for a single device
def generate_iot_data(device_id):
return {
"device_id": int(device_id),
"temperature": random.uniform(10, 50),
"humidity": random.uniform(30, 80),
"event_time": datetime.now().isoformat()
}
# Start data generation loop for 10 minutes
start_time = time.time()
while (time.time() - start_time) < 600:
for device_id in range(5):
data = generate_iot_data(device_id)
unique_filename = f"{base_path}/{device_id}/{uuid.uuid4().hex}.json"
dbutils.fs.put(unique_filename, json.dumps(data), overwrite=True)
It’s outside the scope of this blog to walk through the entirety of the above code, but one thing you may have to change is the base_path. You can fetch the base_path for your newly created volume by browsing to the catalog, clicking the kebab on the right of the iot_data volume, and clicking “Copy path to clipboard”. You’ll want to replace the current base path with the value in the clipboard.
Next, we’ll create a new ETL pipeline to ingest the data.
Navigate to Jobs & Pipelines, and then press the create dropdown. Select ETL Pipeline.
You’ll see a dialog box appear. Choose a folder path, and press select.
This will create a new pipeline along with a blank transformation.
Notice in the image above that the my_transformation.py file is excluded from the pipeline currently. We want to include it in the pipeline, because this will be the code the pipeline will use to ingest the data. Click where it says “Excluded”, and change it to “Included”.
Next, paste the code below in the my_transformation.py file, and adjust the base path appropriately.
I’ve added comments for clarity, but in a nutshell, we’re gathering a list of subfolders from a target volume, and then building a separate LDP flow for each of the subfolders.
from pyspark import pipelines as dp
import os
# The path to the root of the volume where the iot_data will be dropped.
base_path = "/Volumes/main/default/iot_data"
# List all subdirectories under the base path
folders = [f.path for f in dbutils.fs.ls(base_path) if f.isDir()]
# Define the schema to be used for the new streaming table. We will also use this
# when we are ingesting JSON files from the volumes.
schema = "device_id INT, temperature DOUBLE, humidity DOUBLE, event_time TIMESTAMP"
# We need to create the streaming table before we start ingesting data.
# As a note, I'm using the device_id as the partition column. Doing so will
# help to increase query responsiveness when filtering on device_id.
dp.create_streaming_table(
name="device_events",
schema=schema,
partition_cols=["device_id"]
)
# Finally, we're going to iterate through the folders and and create an
# append flow for each of them. Care should be taken to ensure that each
# flow within the pipeline has a unique, but deterministic name.
for idx, folder in enumerate(folders):
@DP.append_flow(target="device_events",
name=f"flow_{idx}")
def _flow(folder=folder):
return (
spark.readStream
.schema(schema)
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(folder)
)
Once your pipeline is in place, you can run it either as a triggered pipeline or as a continuous pipeline.
You’ll notice that the running pipeline looks about as simple as can be, having just one table: device_events.
Below this, you’ll see a tab called “Performance”. If you click it, you’ll find a list of all the queries associated with the pipeline, along with their current status, allowing you to explore the performance of importing a specific device’s data.
That was easy! You are now ingesting five distinct sets of IoT data points, and merging them into one central table, and all that in just a few short lines of code.
There are a few things to know about setting things up like this:
Lakeflow Declarative Pipelines append flows offer a scalable solution for unifying multiple IoT data streams. It enables seamless combination of multiple sources without the drawbacks of traditional union operations, thus simplifying both ingestion and ongoing maintenance of streaming datasets. This approach not only leverages incremental updates but is highly adaptable to an environment where devices are added and removed, making it well-suited to the dynamic nature of modern IoT deployments. As manufacturers continue to embrace data-driven strategies at scale, append flows is a tool you should have in your toolkit for attaining operational agility in the face of ever-changing data volumes.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.