cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark read multiple Parquet type expansion failure

Erik_L
Contributor II

Problem

Reading nearly equivalent parquet tables in a directory with some having column X with type float and some with type double fails.

Attempts at resolving

  1. Using streaming files
  2. Removing delta caching, vectorization
  3. Using ,cache() explicitly

Notes

This is a known problem, but I need a work around.

Example code

(spark.read.option("mergeSchema", False)
    .option("spark.databricks.io.cache.enabled", False)
    .parquet(
        f"s3://my-bucket/data/*"
    )
    .write.mode("append").saveAsTable("my_table"))

1 ACCEPTED SOLUTION

Accepted Solutions

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

View solution in original post

2 REPLIES 2

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

Anonymous
Not applicable

Hi @Erik Louie​ 

Help us build a vibrant and resourceful community by recognizing and highlighting insightful contributions. Mark the best answers and show your appreciation!

Regards

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group