02-20-2023 11:55 PM
Hello to everyone!
I am trying to read delta table as a streaming source using spark. But my microbatches are disbalanced - one very small and the other are very huge. How I can limit this?
I used different configurations with maxBytesPerTrigger and maxFilesPerTrigger, but nothing changes, batch size is always the same.
Are there any ideas?
df = spark \
.readStream \
.format("delta") \
.load("...")
df \
.writeStream \
.outputMode("append") \
.option("checkpointLocation", "...") \
.table("...")
Kind Regards
02-21-2023 04:12 AM
besides the parameters you mention, I don't know of any other which controls the batch size.
did you check if the delta table is not horribly skewed?
02-21-2023 04:12 AM
besides the parameters you mention, I don't know of any other which controls the batch size.
did you check if the delta table is not horribly skewed?
02-27-2023 08:52 AM
Thanks, you are right! Data was very skewed
02-22-2023 02:42 AM
Hi @Yuliya Valava, If you read a Delta table as a stream in PySpark, you can limit the input rate by setting the maxFilesPerTrigger option.
This option controls the maximum number of new files processed in a single trigger interval. By reducing this value, you can limit the input rate and manage the data processed in each batch.
Here's an example of how to limit the input rate when reading a Delta table as a stream:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()
df = spark.readStream.format("delta").load("/path/to/delta_table")
df = df.select("*")
# Limit the input rate to 1 file per trigger interval
df = df.option("maxFilesPerTrigger", 1)
# Process the stream
query = df.writeStream.format("console").start()
query.awaitTermination()
In this example, we are reading a Delta table as a stream and selecting all columns from the table. We then set the maxFilesPerTrigger option to 1, limiting the input rate to 1 file per trigger interval.
Finally, we write the stream to the console and start the stream processing using
a query.awaitTermination().
Note that the maxFilesPerTrigger option may not be effective in limiting the input rate if your Delta table contains huge files or if the files are compressed.
In such cases, you may need to split the files into smaller chunks or decompress them before processing the data.
02-22-2023 02:50 AM
Hi @Yuliya Valava, If you are setting the maxBytesPerTrigger and maxFilesPerTrigger options when reading a Delta table as a stream, but the batch size is not changing, there could be a few reasons for this:
To determine whether maxBytesPerTrigger or maxFilesPerTrigger are being applied correctly, you can try setting them to very low values (e.g., 1 or 10) to see if the batch size changes significantly.
You can also monitor the number of files or bytes processed per batch using the Databricks UI or by adding logging statements to your code.
If you are still having trouble controlling the batch size, you may want to consider repartitioning your data before processing it, or using other techniques to optimize your processing logic.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group