- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-26-2022 02:54 PM
I am trying to load parquet files using Autoloader. Below is the code
def autoload_to_table (data_source, source_format, table_name, checkpoint_path):
query = (spark.readStream
.format('cloudFiles')
.option('cloudFiles.format', source_format)
.schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long, Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long, total_amount long, congestion_Surcharge long, airport_fee long ")
.option('cloudFiles.schemaLocation', checkpoint_path)
.load(data_source)
.writeStream
.option('checkpointLocation', checkpoint_path)
.option('mergeSchema', "true")
.table(table_name)
)
return query
query = autoload_to_table (data_source = "/mnt/landing/nyctaxi",
source_format = "parquet",
table_name = "yellow_trip_data",
checkpoint_path='/tmp/delta/yellowdata/_checkpoints'
)However, I run into the following error. i have also attached the ipython notebook/
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3011.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3011.0 (TID 11673) (10.139.64.5 executor 0): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:54)
- Labels:
-
Autloader
-
Autoloader
-
Checkpoint Path