cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark predicate pushdown on parquet files when using limit

JacintoArias
New Contributor III

Hi,

While developing an ETL for a large dataset I want to get a sample of the top rows to check that my the pipeline "just runs", so I add a limit clause when reading the dataset.

I'm surprised to see that instead of creating a single task as in a show command it creates a task for every partition file (idk if it is just checking metadata or preloading data, but given the execution time I fear is the second case).

I was expecting the limit(n) clause to pushdown to the filesystem and load data from just the number of files required.

Is there a way to ensure this behaviour? Cheers!

Minimal example:

(
  spark
    .read.parquet("<path>")
    # Partition filter
    .where()
     # Get top rows
    .limit(100)
)
 
spark.show() # <- This triggers a job with a number of tasks equal to number of files in the partition...

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

Just point it to folder with one partition.

Additionally specify schema to avoid inferSchema behavior.

If you don't have schema you can use samplingRatio=None so it will read only first row to determine schema.

Hi Hubert, thanks for the quick response,

> Just point it to folder with one partition.

This will work but I'd like a general solution than can be applied without changing the path or having to look inside the folder architecture, some times you don't have access easily to S3 or other filesystems (when using a catalog)

> Additionally specify schema to avoid inferSchema behavior.

This didn't work for me, at least using parquet files, it still creates the huge amount of tasks.

>  you can use samplingRatio=None

didn't work for me either, it seems it still reads metadata/rows from parquet files

Thank you very much Kaniz

I know understand that Predicate Pushdown only works for WHERE clauses with partitions and the internal statistics of parquet files.

I would be interesting though to have a nice way to retrieve just a few rows of a highly partitioned dataset without hardcoding the route of internal parquet files when loading the dataframe.

Hubert-Dudek
Esteemed Contributor III

You can also try to read parquet as stream with limit and trigger once option.

Then depends on environment set different checkpoints and different limits.

With trigger ones you can stream to load just batch jobs and use the same code. Additionally cloudfiles in databricks are nice to detect new files.

JacekLaskowski
New Contributor III

It's been a while since the question was asked, and in the meantime Delta Lake 2.2.0 hit the shelves with the exact feature the OP asked about, i.e. LIMIT pushdown:

LIMIT pushdown into Delta scan. Improve the performance of queries containing LIMIT clauses by pushing down the LIMIT into Delta scan during query planning. Delta scan uses the LIMIT and the file-level row counts to reduce the number of files scanned which helps the queries read far less number of files and could make LIMIT queries faster by 10-100x depending upon the table size.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group