cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to merge parquets with different column types

Erik_L
Contributor II

Problem

I have a directory in S3 with a bunch of data files, like "data-20221101.parquet". They all have the same columns: timestamp, reading_a, reading_b, reading_c. In the earlier files, the readings are floats, but in the later ones they are doubles. When I run the following read, this fails due to merge failure.

from pyspark.sql.functions import col, expr
from pyspark.sql.types import DoubleType, LongType, StructField, StructType
 
schema = StructType([
    StructField("timestamp", LongType()),
    StructField("reading_a", DoubleType()),
    StructField("reading_b", DoubleType()),
    StructField("reading_c", DoubleType()),
])
 
(spark.read.schema(schema)
    .option("mergeSchema", False)
    .parquet('s3://readings/path/to/data/data-*.parquet')
    .write
    .saveAsTable('readings.data'))

And it gives the following error:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.MutableDouble cannot be cast to org.apache.spark.sql.catalyst.expressions.MutableFloat

Attempts

  • Turn mergeSchema on and off;
  • set the schema, don't set the schema; and
  • read individual files (succeeds).

What I think is going on

Spark reads a file that has float type, then tries to continue reading files with that before upcasting to double type, but this fails when it gets to the file with a double. Really, spark should obey my schema from the start and always upcast.

More info

Someone else did a rather deep dive into solving this and shows a bunch of different methods, but their final solution is a hack and not sustainable. They read every file individually, then convert to their schema, then merge them. This negates a lot of the benefit of Sparks magical reading capabilities.

https://medium.com/data-arena/merging-different-schemas-in-apache-spark-2a9caca2c5ce

Question

How can I read many files with only slightly different parquet types without having to do this hack above?

1 REPLY 1

mathan_pillai
Valued Contributor
Valued Contributor

1) Can you let us know what was the error message when you don't set the schema & use mergeSchema

2) What happens when you define schema (with FloatType) & use mergeSchema ? what error message do you get ?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.