cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Read large volume of parquet files

Shan1
New Contributor II

I have 50k + parquet files in the in azure datalake and i have mount point as well. I need to read all the files and load into a dataframe. i have around 2 billion records in total and all the files are not having all the columns, column order may different , column data type may different. I have tried merge schema, inferschema , custom schema with all the columns as string data type. nothing is working. Finally i decided to read all the files into a list and the iterating the files to read one by one. Is this fine or any other best solution available?

from pyspark.sql.types import StructType, StructField, StringType
from functools import reduce

schema = StructType([
StructField("COL1", StringType(), nullable=True),
StructField("COL2", StringType(), nullable=True),
StructField("COL3", StringType(), nullable=True),
StructField("COL4", StringType(), nullable=True)
])
files = [file.path for file in dbutils.fs.ls("datalake_path_here")]

dfs = []
def load_data(file_path):
return spark.read.format("parquet").schema(schema).load(file_path)
for file_path in files:
df = load_data(file_path)
dfs.append(df)
final_df = reduce(lambda df1, df2: df1.union(df2), dfs)

 

5 REPLIES 5

shan_chandra
Honored Contributor III
Honored Contributor III

@Shan1 - could you please let us know if you need to add a file path column in to the dataframe?

Shan1
New Contributor II

No, its not required to add the file path column into dataframe

shan_chandra
Honored Contributor III
Honored Contributor III

@Shan1  - Thanks for the response. can you please try the below and let us know if it works?

spark.read.option("mergeSchema", "true").parquet("/path/to/parquet/files")

 

Shan1
New Contributor II

I Tried that already, got the error like [CANNOT_MERGE_SCHEMA] failed merging schemas:

additional error info : Schema that cannot be merged with the initial schema

shan_chandra
Honored Contributor III
Honored Contributor III

@Shan1 - This could be due to the files have cols that differ by data type.  Eg. Integer vs long , Boolean vs integer. can be resolved by schemaMerge=False. Please refer to this code.  
https://github.com/apache/spark/blob/418bba5ad6053449a141f3c9c31ed3ad998995b8/sql/core/src/test/scal...

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.