cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark read multiple Parquet type expansion failure

Erik_L
Contributor II

Problem

Reading nearly equivalent parquet tables in a directory with some having column X with type float and some with type double fails.

Attempts at resolving

  1. Using streaming files
  2. Removing delta caching, vectorization
  3. Using ,cache() explicitly

Notes

This is a known problem, but I need a work around.

Example code

(spark.read.option("mergeSchema", False)
    .option("spark.databricks.io.cache.enabled", False)
    .parquet(
        f"s3://my-bucket/data/*"
    )
    .write.mode("append").saveAsTable("my_table"))

1 ACCEPTED SOLUTION

Accepted Solutions

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

View solution in original post

2 REPLIES 2

Erik_L
Contributor II

After many, many hours of trying to resolve this, I figured out a hack that _solves_ the problem, but it's not optimal. I basically read the directory listing of files and then merge them via unions and do a save out.

my_schema = StructType([
    StructField("ordered", StringType()),
    StructField("by", TimestampType()),
    StructField("schema", LongType()),
    StructField("provided", DoubleType()),
])
df = spark.createDataFrame(data=[], schema=my_schema)
 
# ...
        for table_file in table_files:
            df = df.union(
                spark.read.option("mergeSchema", False)
                .option("spark.databricks.io.cache.enabled", False)
                .parquet(
                    f"s3://my-bucket/data/{table_file}"
                )
                # Transformations
                .select('ordered', 'by', 'schema', 'provided')
            )

Anonymous
Not applicable

Hi @Erik Louie​ 

Help us build a vibrant and resourceful community by recognizing and highlighting insightful contributions. Mark the best answers and show your appreciation!

Regards

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!