Read large volume of parquet files
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2024 11:34 AM - edited 02-26-2024 11:37 AM
I have 50k + parquet files in the in azure datalake and i have mount point as well. I need to read all the files and load into a dataframe. i have around 2 billion records in total and all the files are not having all the columns, column order may different , column data type may different. I have tried merge schema, inferschema , custom schema with all the columns as string data type. nothing is working. Finally i decided to read all the files into a list and the iterating the files to read one by one. Is this fine or any other best solution available?
from pyspark.sql.types import StructType, StructField, StringType
from functools import reduce
schema = StructType([
StructField("COL1", StringType(), nullable=True),
StructField("COL2", StringType(), nullable=True),
StructField("COL3", StringType(), nullable=True),
StructField("COL4", StringType(), nullable=True)
])
files = [file.path for file in dbutils.fs.ls("datalake_path_here")]
dfs = []
def load_data(file_path):
return spark.read.format("parquet").schema(schema).load(file_path)
for file_path in files:
df = load_data(file_path)
dfs.append(df)
final_df = reduce(lambda df1, df2: df1.union(df2), dfs)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2024 11:52 AM
@Shan1 - could you please let us know if you need to add a file path column in to the dataframe?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2024 11:53 AM
No, its not required to add the file path column into dataframe
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2024 11:58 AM
@Shan1 - Thanks for the response. can you please try the below and let us know if it works?
spark.read.option("mergeSchema", "true").parquet("/path/to/parquet/files")
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-26-2024 12:02 PM
I Tried that already, got the error like [CANNOT_MERGE_SCHEMA] failed merging schemas:
additional error info : Schema that cannot be merged with the initial schema
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
02-27-2024 08:37 AM - edited 02-27-2024 08:37 AM
@Shan1 - This could be due to the files have cols that differ by data type. Eg. Integer vs long , Boolean vs integer. can be resolved by schemaMerge=False. Please refer to this code.
https://github.com/apache/spark/blob/418bba5ad6053449a141f3c9c31ed3ad998995b8/sql/core/src/test/scal...

