cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How limit input rate reading delta table as stream?

Lulka
New Contributor II

Hello to everyone!

I am trying to read delta table as a streaming source using spark. But my microbatches are disbalanced - one very small and the other are very huge. How I can limit this?

I used different configurations with maxBytesPerTrigger and maxFilesPerTrigger, but nothing changes, batch size is always the same.

Are there any ideas?

df = spark \

  .readStream \

  .format("delta") \

  .load("...")

df \

 .writeStream \

 .outputMode("append") \

 .option("checkpointLocation", "...") \

 .table("...") 

Kind Regards

1 ACCEPTED SOLUTION

Accepted Solutions

-werners-
Esteemed Contributor III

besides the parameters you mention, I don't know of any other which controls the batch size.

did you check if the delta table is not horribly skewed?

View solution in original post

4 REPLIES 4

-werners-
Esteemed Contributor III

besides the parameters you mention, I don't know of any other which controls the batch size.

did you check if the delta table is not horribly skewed?

Lulka
New Contributor II

Thanks, you are right! Data was very skewed

Kaniz
Community Manager
Community Manager

Hi @Yuliya Valava​, If you read a Delta table as a stream in PySpark, you can limit the input rate by setting the maxFilesPerTrigger option.

imageSOURCE

This option controls the maximum number of new files processed in a single trigger interval. By reducing this value, you can limit the input rate and manage the data processed in each batch.

Here's an example of how to limit the input rate when reading a Delta table as a stream:

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.appName("myApp").getOrCreate()
 
df = spark.readStream.format("delta").load("/path/to/delta_table")
df = df.select("*")
 
# Limit the input rate to 1 file per trigger interval
df = df.option("maxFilesPerTrigger", 1)
 
# Process the stream
query = df.writeStream.format("console").start()
 
query.awaitTermination()

In this example, we are reading a Delta table as a stream and selecting all columns from the table. We then set the maxFilesPerTrigger option to 1, limiting the input rate to 1 file per trigger interval.

Finally, we write the stream to the console and start the stream processing using

a query.awaitTermination().

Note that the maxFilesPerTrigger option may not be effective in limiting the input rate if your Delta table contains huge files or if the files are compressed.

In such cases, you may need to split the files into smaller chunks or decompress them before processing the data.

Kaniz
Community Manager
Community Manager

Hi @Yuliya Valava​, If you are setting the maxBytesPerTrigger and maxFilesPerTrigger options when reading a Delta table as a stream, but the batch size is not changing, there could be a few reasons for this:

  1. The input data rate is not exceeding the limits set by maxBytesPerTrigger or maxFilesPerTrigger.
  2. These options control the maximum amount of data that can be processed in a single batch, but if the input rate is lower than these limits, the batch size may not change.
  3. The data in your Delta table is already evenly partitioned, and each partition contains roughly the same amount of data. In this case, even if you set maxBytesPerTrigger or maxFilesPerTrigger to a low value, the batch size may not change much because each partition already contains a similar amount of data.
  4. There may be other factors affecting the batch size, such as the available resources in your cluster or the complexity of your processing logic. These factors can also influence the batch size, regardless of the values set for maxBytesPerTrigger or maxFilesPerTrigger.

To determine whether maxBytesPerTrigger or maxFilesPerTrigger are being applied correctly, you can try setting them to very low values (e.g., 1 or 10) to see if the batch size changes significantly.

You can also monitor the number of files or bytes processed per batch using the Databricks UI or by adding logging statements to your code.

If you are still having trouble controlling the batch size, you may want to consider repartitioning your data before processing it, or using other techniques to optimize your processing logic.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!