cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
davidmorton
Databricks Employee
Databricks Employee

IoT (Internet-of-Things) devices are disrupting the manufacturing industry. While estimates vary, most sources agree that there are tens of billions of IoT devices currently operating in manufacturing in 2025. As advantageous as this is to being able to monitor every aspect of operations, all this data presents a challenge around how to ingest such a volume of data efficiently. Traditional approaches, such as using UNION, are inflexible, require frequent maintenance, and struggle keeping up with large volumes of high-velocity data. What is needed is a way to ingest this data quickly, and unify it so that it can be analyzed and acted upon in a timely manner. 

Enter append flows.

Prior to the addition of append flows, the typical approach has been to perform a UNION within the SQL code, joining several streams of data. While that technically worked, it had several disadvantages, including the need for the sink table to be fully refreshed. 

Ultimately, append flows make possible what is envisioned with the creation of Delta Lake. Delta Lake was designed to enable concurrent append and read operations. That said, until the append flow capability came out, there was no native way to do this using Delta Live Tables. 

With append flows, Lakeflow Declarative Pipelines provides a simple way to combine multiple streams of data into one single streaming table. This capability is ideal for any use case where data from multiple sources needs to be combined into a single object.

davidmorton_0-1759929989946.png

When combining multiple data sources into one sink, using append flows is better in many ways than using the union operator. 

  1. Append flows are inherently scalable, and take advantage of automatic optimization through the Lakeflow Declarative Pipeline system
  2. Append flows are more performant than union, and do not require a full refresh of the table. This means it can take full advantage of incremental data updates. 
  3. Append flows are highly flexible, allowing individual flows to be dynamically added and removed without having to completely rebuild the sink table.
  4. Append flows allow multiple pipelines to append to a single table, enabling sink tables that can pull data from more than 1000 sources, which is the current maximum number of flows possible within a single pipeline.

Let’s create a very basic example that reads JSON files from a series of separate folders, and, using append flows, combines them into one streaming table.

Building a Small Pipeline Using Append Flows

First, we’re going to start by creating a new volume in Unity Catalog.

davidmorton_1-1759930038038.png

In this case, I’m going to use “iot_data” for my volume, but you can name it whatever you want. You can create this as a managed volume, or as an external volume. Either one will work with append flows. 

Now that we have the volume created, let’s add some data. In the real world, of course, you’d need to address concerns about throughput and security, but for the purposes of showcasing append flows in this blog, we’re going to assume you’ve solved those issues and you have your data being added to a volume in Databricks already:

import time, uuid, json, random
from datetime import datetime

# Create subdirectories for each sensor
base_path = "/Volumes/main/default/iot_data"

# Generate folders for each of the IoT devices we're working with.
for device_id in range(5):
   try:
       dbutils.fs.ls(f"{base_path}/{device_id}")
   except:
       dbutils.fs.mkdirs(f"{base_path}/{device_id}")

# Function to generate a DataFrame for a single device
def generate_iot_data(device_id):
   return {
       "device_id": int(device_id),
       "temperature": random.uniform(10, 50),
       "humidity": random.uniform(30, 80),
       "event_time": datetime.now().isoformat()
   }

# Start data generation loop for 10 minutes
start_time = time.time()
while (time.time() - start_time) < 600:
   for device_id in range(5):
       data = generate_iot_data(device_id)
       unique_filename = f"{base_path}/{device_id}/{uuid.uuid4().hex}.json"
       dbutils.fs.put(unique_filename, json.dumps(data), overwrite=True)

It’s outside the scope of this blog to walk through the entirety of the above code, but one thing you may have to change is the base_path. You can fetch the base_path for your newly created volume by browsing to the catalog, clicking the kebab on the right of the iot_data volume, and clicking “Copy path to clipboard”. You’ll want to replace the current base path with the value in the clipboard.

Next, we’ll create a new ETL pipeline to ingest the data. 

Navigate to Jobs & Pipelines, and then press the create dropdown. Select ETL Pipeline. 

davidmorton_2-1759930130727.png

You’ll see a dialog box appear. Choose a folder path, and press select.

davidmorton_3-1759930154656.png

This will create a new pipeline along with a blank transformation. 

 

davidmorton_4-1759930166135.png

Notice in the image above that the my_transformation.py file is excluded from the pipeline currently. We want to include it in the pipeline, because this will be the code the pipeline will use to ingest the data. Click where it says “Excluded”, and change it to “Included”. 

Next, paste the code below in the my_transformation.py file, and adjust the base path appropriately. 

I’ve added comments for clarity, but in a nutshell, we’re gathering a list of subfolders from a target volume, and then building a separate LDP flow for each of the subfolders.

from pyspark import pipelines as dp
import os

# The path to the root of the volume where the iot_data will be dropped.
base_path = "/Volumes/main/default/iot_data"

# List all subdirectories under the base path
folders = [f.path for f in dbutils.fs.ls(base_path) if f.isDir()]

# Define the schema to be used for the new streaming table. We will also use this
# when we are ingesting JSON files from the volumes.
schema = "device_id INT, temperature DOUBLE, humidity DOUBLE, event_time TIMESTAMP"

# We need to create the streaming table before we start ingesting data.
# As a note, I'm using the device_id as the partition column. Doing so will
# help to increase query responsiveness when filtering on device_id.
dp.create_streaming_table(
   name="device_events",
   schema=schema,
   partition_cols=["device_id"]
)

# Finally, we're going to iterate through the folders and and create an
# append flow for each of them. Care should be taken to ensure that each
# flow within the pipeline has a unique, but deterministic name.
for idx, folder in enumerate(folders):
   @DP.append_flow(target="device_events",
                   name=f"flow_{idx}")
   def _flow(folder=folder):
       return (
           spark.readStream
               .schema(schema)
               .format("cloudFiles")
               .option("cloudFiles.format", "json")
               .load(folder)
       )

Once your pipeline is in place, you can run it either as a triggered pipeline or as a continuous pipeline.

You’ll notice that the running pipeline looks about as simple as can be, having just one table: device_events.

davidmorton_5-1759930246897.png

Below this, you’ll see a tab called “Performance”. If you click it, you’ll find a list of all the queries associated with the pipeline, along with their current status, allowing you to explore the performance of importing a specific device’s data. 

davidmorton_6-1759930246898.png

That was easy! You are now ingesting five distinct sets of IoT data points, and merging them into one central table, and all that in just a few short lines of code. 

Things To Be Aware Of

There are a few things to know about setting things up like this:

  1. Restarting the pipeline will automatically pick up new IoT devices, should there be a new folder, and backfill the data from those new devices, while still keeping track of what has already been ingested. This makes this approach ideal for any situation where you may be adding new devices dynamically. 
  2. Lakeflow Declarative Pipelines are limited to 1,000 flows, so if your IoT ecosystem is massive, you may need to do some preemptive planning to avoid running into this limit in production. 
  3. The good news regarding #2 above, is that, using append flows, multiple pipelines can be created to feed data into a single target sink, making it ideal for adding additional sources without disrupting current flows.

Conclusion

Lakeflow Declarative Pipelines append flows offer a scalable solution for unifying multiple IoT data streams. It enables seamless combination of multiple sources without the drawbacks of traditional union operations, thus simplifying both ingestion and ongoing maintenance of streaming datasets. This approach not only leverages incremental updates but is highly adaptable to an environment where devices are added and removed, making it well-suited to the dynamic nature of modern IoT deployments. As manufacturers continue to embrace data-driven strategies at scale, append flows is a tool you should have in your toolkit for attaining operational agility in the face of ever-changing data volumes.