yesterday - last edited yesterday
What I'm trying to achieve: ingest files into bronze tables with Autoloader, then produce Kafka messages for each file ingested using a DLT sink.
The issue: latency between file ingested and message produced get exponentially higher the more tables are added to the pipeline and simultaneous files are sent to be ingested; even at a low number of tables and a single file for each, latency can get to over a minute.
---
The test pipeline is a very basic one, and it's being run in continuous mode:
import dlt
from pyspark.sql import functions as f
@dlt.table
def bronze_table():
return (
spark.readStream.format("cloudFiles")
# Options
.load("/path/to/volume")
.withColumn("created_at", f.current_timestamp())
.withColumn("file_name", f.col("_metadata.file_path"))
)
dlt.create_sink("kafka_sink", "kafka", options=kafkaOptions)
@dlt.append_flow(name=f"kafka_flow", target="kafka_sink")
def kafka_sink():
return dlt.read_stream("bronze_table").groupBy("file_name").agg(f.first("created_at").alias("created_at"))
The Autoloader source path only has a single, small file. To test out, I have a script that creates a new file at the source, start a Kafka consumer, then measure the time between the file created and a message received with matching data.
To get closer to the actual volume, I add tables and a respective append_flow to each, targeting the same sink. So there's one Kafka sink, X tables, and X append_flow invocations all pointing to the same sink. But after 10 tables or so, and creating a file for each to be consumed at once, the latency of the last message to arrive can get over a minute.
CPU and memory are comfortably low in usage. Even so, I've tested out several cluster configurations, not only in resources but in cluster types as well, and at some point I've even equaled number of tables and worker cores to absolutely rule out resource constraints. No discernible difference.
I've also ruled out issues with our Kafka service by testing producing & consuming messages through a separate Databricks notebook - on the same workspace, to also rule possible networking issues.
The only thing I haven't tried yet was to split ingestion and sink/producer into two different pipelines, but I understand that my test pipeline with 10 tables isn't particularly large.
Can someone help me find the possible bottleneck in this process?
yesterday
Hi @rcostanza ,
I see in your approach one serious issue. Whenever you use group by in spark structured streaming (and DLT streaming tables under the hood are based on spark structured streaming) you will have a stateful operation.
That means:
Spark creates a state store (backed by RocksDB) to track state
It keeps that state until a watermark tells it it’s safe to drop old keys.
Because you didn’t specify a Spark keeps all keys indefinitely.
So over time you accumulate a growing RocksDB state per streaming query. So that could explains why with time it keeps getting slower and slower while CPU consumption is not that high. Because time is lost to search within state.
I strongly recommend to you familarize yourself with Spark Structured Streaming guide where it's explain in much more detailed manner:
Structured Streaming Programming Guide - Spark 4.0.1 Documentation
yesterday
Thanks for your reply
The state store in theory should be at a very modest size, considering there's are at most 5 test files for each table, and that each have a very low amount of rows each (100 or so). In any case, in one of my tests I've tried to reduce it to a minimum by using a watermark period that should be safe in the context of newly arrived files:
def kafka_sink():
return (
dlt.read_stream("bronze_table")
.withWatermark("created_at", "60 seconds")
.dropDuplicatesWithinWatermark(["file_name"])
.select("file_name", "created_at")
)
But that didn't help.
Also, I've been restarting the tests by clearing the test files and running a full pipeline refresh. In theory, the state should start fresh as well. Even so, there's still significant latency on the next test files created afterwards.
11 hours ago
Hi, I think it is a delay of the autoloader as it doesn't know about the ingested files. It is nothing in common with the state, as it is just an autoloader and it keeps a list of processed files. Autloader scans the directory every minute, usually and scan take also time. Just move the volume to the new external location and the new container, and for that external location enable file events (event grid) - this way you eliminate scanning intervals and time required to perform storage scan. Just remember that that all file operation goes to that event grid so it is god to set it up separately from other locations - https://learn.microsoft.com/en-us/azure/databricks/ingestion/cloud-object-storage/auto-loader/file-n...
6 hours ago
The files are actually being ingested into bronze within seconds of being created, at least in low volumes - one file per table, created simultaneously, small amount of rows. I haven't tweaked anything in particular, so maybe file notification was already enabled on the S3 bucket I'm using as source for the Autoloader.
Even so, for 10 or so tables on the pipeline, I create 10 files, they're ingested almost instantly, but the Kafka sink sometimes won't produce messages for another several seconds and sometimes minutes.
I managed to reduce the latency by a lot by moving the Kafka sink alone to a separate pipeline, but it still isn't as low as I expected for the amount of tables and the file volume I'm testing with. And I don't think the original pipeline with 10 bronze & 10 silver & Kafka sink was particularly big.
I'd like to squeeze that latency as low as possible, but I don't know what else I can try, and I can't see where the bottleneck is.
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now