cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Does partition pruning / partition elimination not work for folder partitioned JSON files? (Spark 3.1.2)

MartinB
Contributor III

Imagine the following setup:

I have log files stored as JSON files partitioned by year, month, day and hour in physical folders:

"""
/logs
|-- year=2020
|-- year=2021
`-- year=2022
    |-- month=01
    `-- month=02
        |-- day=01
        |-- day=...
        `-- day=13
            |-- hour=0000
            |-- hour=...
            `-- hour=0900
                |-- log000001.json
                |-- <many files>
                `-- log000133.json
""""

Spark supports partition discovery for folder structures like this ("All built-in file sources (including Text/CSV/JSON/ORC/Parquet) are able to discover and infer partitioning information automatically" https://spark.apache.org/docs/latest/sql-data-sources-parquet.html#partition-discovery).

However, in contrast to PARQUET files, I found that Spark does not uses this meta information for partition pruning / partition elimination when reading JSON files.

In my use case I am only interested in logs from a specific time window (see filter😞

(spark
  .read
  .format('json')
  .load('/logs')
  .filter('year=2022 AND month=02 AND day=13 AND hour=0900')
)

I'd expect that Spark would be able to apply the filters on the partition columns "early" and only scan folders matching the filters (e.g. Spark would not need to scan folders and read files under '/logs/year=2020').

However, in practice the execution of my query takes a lot of time. It looks to me as if Spark scans first the whole filesystem starting at '/logs' reads all files and then applies the filters (on the already read data). Due to the nested folder structure and the large number of folders/files this is very expensive.

Apparently Spark does not push down the filter (applies partition pruning / partition elimination).

For me it is weird that the behavior for processing JSON files differs from Parquet.

Is this as-designed or a bug?

For now, I ended up implementing partition pruning myself in a pre-processing step by using dbutils.fs.ls for scanning the "right" folders iteratively and assembling an explicit file list that I then pass on to the spark read command.

16 REPLIES 16

AmanSehgal
Honored Contributor III

Instead of nested directories, could you try single level partition and have you partition names as `year_month_day_hour` (assuming that you have your JSON files in hour directory only). In that way spark knows in one shot which partition it has to look at.

Querying could be expensive if your JSON files are very small in size (in KBs probably).

Maybe check the file sizes and instead of having log files per hour, you would be better off by having them partitioned by per day.

Last, maybe try querying using col function. Not sure if it'll help, but worth giving a try.

from pyspark.sql.functions import col
 
spark
      .read
      .format('json')
      .load('/logs')
      .filter( (col('year')=2022) & (col('month')=02) & (col('day')=13)  & (col('hour')=0900'))

MartinB
Contributor III

Hi @Aman Sehgal​ ,

thanks for your advice.

Unfortunately I have no influence on the partitioning of the data, I'm just a consumer 😣

Anyhow, I'd like to know why you think that Spark would be able to apply partition elimination if there would be just one partitioning level.

Imagine there would be data of 3 years, this would mean, that there would be 3*365*24=26,280 folders under \logs. As far as I can tell, Spark would still discover all those directories and load all found JSON files to memory before applying the filter.

Or are you suggesting determining the right folder manually and then loading from the correct folder?

This would be "manual" partition elimination, in my opinion.

(spark
  .read
  .format('json')
  .load('/logs/2022_02_13_0900')
)

I also tried using the col function in the filter. Unfortunately it had no performance impact over specifying the filter als "SQL condition string". 😟

AmanSehgal
Honored Contributor III

Spark wouldn't discover all the directories. It'll straightaway go to partition value.

Could you give more proof of your hypothesis? Like spark logs or DAG?

My next guess would be that the files in log files are small in size. Could you check that and post the file size in the final partition?

I created a test setup.

I generated lots of exemplary rows simulating log entries; for 90 days, each day with 600,000 log entries (=54,000,000 rows). Each entry has a log timestamp. I created another column for "binning" all entries in the nearest "5 minute" window.

I saved this data frame as JSON, partitioned by the 5min-timestamp column.

So I ended up with 12,960 folders containing each one JSON file.

Then I tried:

(
    spark
    .read
    .format('json')
    .load(f'{path_base_folder}/some_timestamp_bin_05m=2022-03-23 18%3A00%3A00')
    .explain(extended=True)
)

image 

As well as

(
    spark
    .read
    .format('json')
    .load(f'{path_base_folder}')
    .filter( F.col('some_timestamp_bin_05m')=="2022-03-23 18:00:00")
    .explain(extended=True)
)

imageAs you can see, from the "Input Size" metric, the second statement actually read all files and then applied the filter.

Interestingly, prior to the read job, two more jobs are carried out; to scan the file system:

image

Any thoughts, @Aman Sehgal​ ?

AmanSehgal
Honored Contributor III

At this stage the only thing I can think of is the file format -JSON. Since you've the test setup can you write all the data in Parquet format or Delta format and then run the query?

Hi @Aman Sehgal​ ,

Yes, when I change the test setup to Parquet format, the partition elimination works.

This is my original question: Is the partition elimination supposed to work only for Parquet / ORC or also for JSON?

My findings show that it does not work for JSON - is this a bug or a not supported feature?

Hi @Kaniz Fatma​ ,

Yes, I'd like to open a feature request.

However, I cannot access the ideas portal.

I'm just a Community Edition user and don't have a workspace domain...

image

Hi @Kaniz Fatma​ ,

Are you positive that the ideas portal should work for Community Edition users?

When I try to log into the ideas portal using "community" workspace I always get an error message:

image

Hi @Kaniz Fatma​ , any updates?

Hi @Kaniz Fatma​ ,

Yes, this is what I did - and I end up with the same error every time after the login:

image

Hi @Kaniz Fatma​ ,

Any updates on this one?

Hi @Kaniz Fatma​ ,

Yes I did. This time no more error is displayed as before.

But following your link https://databricks.com/feedback I end up on the landing page in my community workspace; I had expected a feedback portal.

I my workspace, under "help" there is another "feedback" button:

imageBut this is just a mailto- link for the address feedback@databricks.com.

Is this the intended way to make feature requests?

Hi @Kaniz Fatma​ ,

When I try to access https://ideas.databricks.com/ the situation just as I described a month ago: after the login an error is displayed:

image.pngLast month, I had the understanding that you are going to check, why that is the case.

Are you positive that Databricks community edition (=free) users are allowed to access the ideas portal?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group