cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read large volume of parquet files

Shan1
New Contributor II

I have 50k + parquet files in the in azure datalake and i have mount point as well. I need to read all the files and load into a dataframe. i have around 2 billion records in total and all the files are not having all the columns, column order may different , column data type may different. I have tried merge schema, inferschema , custom schema with all the columns as string data type. nothing is working. Finally i decided to read all the files into a list and the iterating the files to read one by one. Is this fine or any other best solution available?

from pyspark.sql.types import StructType, StructField, StringType
from functools import reduce

schema = StructType([
StructField("COL1", StringType(), nullable=True),
StructField("COL2", StringType(), nullable=True),
StructField("COL3", StringType(), nullable=True),
StructField("COL4", StringType(), nullable=True)
])
files = [file.path for file in dbutils.fs.ls("datalake_path_here")]

dfs = []
def load_data(file_path):
return spark.read.format("parquet").schema(schema).load(file_path)
for file_path in files:
df = load_data(file_path)
dfs.append(df)
final_df = reduce(lambda df1, df2: df1.union(df2), dfs)

 

5 REPLIES 5

shan_chandra
Esteemed Contributor
Esteemed Contributor

@Shan1 - could you please let us know if you need to add a file path column in to the dataframe?

Shan1
New Contributor II

No, its not required to add the file path column into dataframe

shan_chandra
Esteemed Contributor
Esteemed Contributor

@Shan1  - Thanks for the response. can you please try the below and let us know if it works?

spark.read.option("mergeSchema", "true").parquet("/path/to/parquet/files")

 

Shan1
New Contributor II

I Tried that already, got the error like [CANNOT_MERGE_SCHEMA] failed merging schemas:

additional error info : Schema that cannot be merged with the initial schema

shan_chandra
Esteemed Contributor
Esteemed Contributor

@Shan1 - This could be due to the files have cols that differ by data type.  Eg. Integer vs long , Boolean vs integer. can be resolved by schemaMerge=False. Please refer to this code.  
https://github.com/apache/spark/blob/418bba5ad6053449a141f3c9c31ed3ad998995b8/sql/core/src/test/scal...

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!