06-26-2022 02:54 PM
I am trying to load parquet files using Autoloader. Below is the code
def autoload_to_table (data_source, source_format, table_name, checkpoint_path):
query = (spark.readStream
.format('cloudFiles')
.option('cloudFiles.format', source_format)
.schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long, Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long, total_amount long, congestion_Surcharge long, airport_fee long ")
.option('cloudFiles.schemaLocation', checkpoint_path)
.load(data_source)
.writeStream
.option('checkpointLocation', checkpoint_path)
.option('mergeSchema', "true")
.table(table_name)
)
return query
query = autoload_to_table (data_source = "/mnt/landing/nyctaxi",
source_format = "parquet",
table_name = "yellow_trip_data",
checkpoint_path='/tmp/delta/yellowdata/_checkpoints'
)
However, I run into the following error. i have also attached the ipython notebook/
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3011.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3011.0 (TID 11673) (10.139.64.5 executor 0): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:54)
06-27-2022 07:50 AM
As @Werner Stinckens said.
Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.
schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
06-27-2022 12:19 AM
it could be an incompatible schema,
there is a knowledge base article about that.
06-27-2022 07:50 AM
As @Werner Stinckens said.
Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.
schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
06-27-2022 08:25 AM
Smart idea. Let me try this one. @Hubert Dudek
06-27-2022 08:45 AM
This ran !!! you are awesome @Hubert Dudek
Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections.
Click here to register and join today!
Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.