Unable to load Parquet file using Autoloader. Can someone help?

Mayank
New Contributor III

I am trying to load parquet files using Autoloader. Below is the code

def autoload_to_table (data_source, source_format, table_name, checkpoint_path):
    query = (spark.readStream
                  .format('cloudFiles')
                  .option('cloudFiles.format', source_format)
                  .schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long,  Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long,  total_amount long, congestion_Surcharge long, airport_fee long ")
                  .option('cloudFiles.schemaLocation', checkpoint_path)
                  .load(data_source)
                  .writeStream
                  .option('checkpointLocation', checkpoint_path)
                  .option('mergeSchema', "true")
                  .table(table_name)
            )
    
    return query
 
query = autoload_to_table (data_source = "/mnt/landing/nyctaxi",
                           source_format = "parquet",
                           table_name = "yellow_trip_data",
                           checkpoint_path='/tmp/delta/yellowdata/_checkpoints'
                          )

However, I run into the following error. i have also attached the ipython notebook/

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3011.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3011.0 (TID 11673) (10.139.64.5 executor 0): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary

at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)

at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:54)