07-22-2015 01:15 PM
Hi,
I need some guide lines for a performance issue with Parquet files :
I am loading a set of parquet files using : df = sqlContext.parquetFile( folder_path )
My parquet folder has 6 sub division keys
It was initially ok with a first sample of data organized this way so I stared pushing more and performance is slowing down very quickly as I do so
Because the way data arrives every day the above folder partition is "natural" BUT it leads to small fies which I read is a bottleneck explanation
Shall I merge several of of sub folders in a second phase ? If so what function (python API) shall I use for this ?
07-24-2015 10:17 AM
Hi Mzaradzki -
In Spark 1.5 which we will be adding a feature to improve metadata caching in parquet specifically so it should greatly improve performance for your use case above.
One option to improve performance in Databricks is to use the dbutils.fs.cacheFiles function to move your parquet files to the SSDs attached to the workers in your cluster.
Cheers,
Richard
07-24-2015 01:37 PM
Hi Richard,
Will this actually parallelize reading the footers? Or just help for Spark-generated parquet files? WRT to the serialized footer reading, I haven't noticed large gains with caching the files on the ssds.
Cheers,
Ken
07-24-2015 10:22 AM
Hi,
There are a couple of SQL optimizations I recommend for you to consider.
1) Making use of partitions for your table may help if you frequently only access data from certain days at a time. There's a notebook in the Databricks Guide called "Partitioned Tables" with more data.
2) If your files are really small - it is true that you may get better performance by consolidating those files into a smaller number. You can do that easily in spark with a command like this:
sqlContext.parquetFile( SOME_INPUT_FILEPATTERN )
.coalesce(SOME_SMALLER_NUMBER_OF_DESIRED_PARTITIONS)
.write.parquet(SOME_OUTPUT_DIRECTORY)
07-24-2015 10:28 AM
Having a large # of small files or folders can significantly deteriorate the performance of loading the data. The best way is to keep the folders/files merged so that each file is around 64MB size. There are different ways to achieve this: your writer process can either buffer them in memory and write only after reaching a size or as a second phase you can read the temp directory and consolidate them together and write it out to a different location. If you want to do the latter, you can read each of your input directory as a dataframe and union them and repartition it to the # of files you want and dump it back. A code snippet in Scala would be:
val dfSeq = MutableList[DataFrame]()sourceDirsToConsolidate.map(dir => { val df = sqlContext.parquetFile(dir) dfSeq += df })
val masterDf = dfSeq.reduce((df1, df2) => df1.unionAll(df2)) masterDf.coalesce(numOutputFiles).write.mode(saveMode).parquet(destDir)
The dataframe's api is same in python. So you might be able to easily convert this to python.
07-16-2017 12:22 PM
Hi Prakash,
I am trying to transfer parquet files from hadoop on prem to S3 , i am able to move normal HDFS file's but when it comes to parquet it is not working properly .
Do you have any clue how do we transfer parquet files from HDFS to S3 ?
Appreciate your response.
Thanks
Ishan
08-26-2019 11:25 PM
I have multiple small parquet files in all partitions , this is legacy data , want to merge files in individual partitions directories to single files. how can we achieve this.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group