cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark read multiple Parquet type expansion failure

Erik_L
Contributor II

Problem

Reading nearly equivalent parquet tables in a directory with some having column X with type float and some with type double fails.

Attempts at resolving

  1. Using streaming files
  2. Removing delta caching, vectorization
  3. Using ,cache() explicitly

Notes

This is a known problem, but I need a work around.

Example code

(spark.read.option("mergeSchema", False)
    .option("spark.databricks.io.cache.enabled", False)
    .parquet(
        f"s3://my-bucket/data/*"
    )
    .write.mode("append").saveAsTable("my_table"))

1 ACCEPTED SOLUTION

Accepted Solutions

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

View solution in original post

2 REPLIES 2

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

Anonymous
Not applicable

Hi @Erik Louie​ 

Help us build a vibrant and resourceful community by recognizing and highlighting insightful contributions. Mark the best answers and show your appreciation!

Regards

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.