cancel
Showing results for 
Search instead for 
Did you mean: 

Unable to load Parquet file using Autoloader. Can someone help?

Mayank
New Contributor III

I am trying to load parquet files using Autoloader. Below is the code

def autoload_to_table (data_source, source_format, table_name, checkpoint_path):
    query = (spark.readStream
                  .format('cloudFiles')
                  .option('cloudFiles.format', source_format)
                  .schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long,  Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long,  total_amount long, congestion_Surcharge long, airport_fee long ")
                  .option('cloudFiles.schemaLocation', checkpoint_path)
                  .load(data_source)
                  .writeStream
                  .option('checkpointLocation', checkpoint_path)
                  .option('mergeSchema', "true")
                  .table(table_name)
            )
    
    return query
 
query = autoload_to_table (data_source = "/mnt/landing/nyctaxi",
                           source_format = "parquet",
                           table_name = "yellow_trip_data",
                           checkpoint_path='/tmp/delta/yellowdata/_checkpoints'
                          )

However, I run into the following error. i have also attached the ipython notebook/

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3011.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3011.0 (TID 11673) (10.139.64.5 executor 0): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)

at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:54)

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

As @Werner Stinckens​ said.

Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.

schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

View solution in original post

8 REPLIES 8

-werners-
Esteemed Contributor III

it could be an incompatible schema,

there is a knowledge base article about that.

Hubert-Dudek
Esteemed Contributor III

As @Werner Stinckens​ said.

Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.

schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)

Mayank
New Contributor III

Smart idea. Let me try this one. @Hubert Dudek​ 

Mayank
New Contributor III

This ran !!! you are awesome @Hubert Dudek​ 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.