01-28-2022 02:17 AM
Hi,
While developing an ETL for a large dataset I want to get a sample of the top rows to check that my the pipeline "just runs", so I add a limit clause when reading the dataset.
I'm surprised to see that instead of creating a single task as in a show command it creates a task for every partition file (idk if it is just checking metadata or preloading data, but given the execution time I fear is the second case).
I was expecting the limit(n) clause to pushdown to the filesystem and load data from just the number of files required.
Is there a way to ensure this behaviour? Cheers!
Minimal example:
(
spark
.read.parquet("<path>")
# Partition filter
.where()
# Get top rows
.limit(100)
)
spark.show() # <- This triggers a job with a number of tasks equal to number of files in the partition...
01-31-2022 08:45 AM
Hi @Jacinto Arias , This is because predicate pushdown is currently not supported in Spark, see this very good answer.
Actually, take(n) should take a really long time as well. I just tested it, however, and get the same results as you do - take is almost instantaneous regardless of database size, while limit takes a lot of time.
01-28-2022 05:09 AM
Just point it to folder with one partition.
Additionally specify schema to avoid inferSchema behavior.
If you don't have schema you can use samplingRatio=None so it will read only first row to determine schema.
01-28-2022 08:36 AM
Hi Hubert, thanks for the quick response,
> Just point it to folder with one partition.
This will work but I'd like a general solution than can be applied without changing the path or having to look inside the folder architecture, some times you don't have access easily to S3 or other filesystems (when using a catalog)
> Additionally specify schema to avoid inferSchema behavior.
This didn't work for me, at least using parquet files, it still creates the huge amount of tasks.
> you can use samplingRatio=None
didn't work for me either, it seems it still reads metadata/rows from parquet files
01-31-2022 08:45 AM
Hi @Jacinto Arias , This is because predicate pushdown is currently not supported in Spark, see this very good answer.
Actually, take(n) should take a really long time as well. I just tested it, however, and get the same results as you do - take is almost instantaneous regardless of database size, while limit takes a lot of time.
02-01-2022 05:04 AM
Thank you very much Kaniz
I know understand that Predicate Pushdown only works for WHERE clauses with partitions and the internal statistics of parquet files.
I would be interesting though to have a nice way to retrieve just a few rows of a highly partitioned dataset without hardcoding the route of internal parquet files when loading the dataframe.
02-01-2022 12:20 PM
You can also try to read parquet as stream with limit and trigger once option.
Then depends on environment set different checkpoints and different limits.
With trigger ones you can stream to load just batch jobs and use the same code. Additionally cloudfiles in databricks are nice to detect new files.
03-13-2023 06:34 AM
It's been a while since the question was asked, and in the meantime Delta Lake 2.2.0 hit the shelves with the exact feature the OP asked about, i.e. LIMIT pushdown:
LIMIT pushdown into Delta scan. Improve the performance of queries containing LIMIT clauses by pushing down the LIMIT into Delta scan during query planning. Delta scan uses the LIMIT and the file-level row counts to reduce the number of files scanned which helps the queries read far less number of files and could make LIMIT queries faster by 10-100x depending upon the table size.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group