โ06-26-2022 02:54 PM
I am trying to load parquet files using Autoloader. Below is the code
def autoload_to_table (data_source, source_format, table_name, checkpoint_path):
query = (spark.readStream
.format('cloudFiles')
.option('cloudFiles.format', source_format)
.schema("VendorID long,tpep_pickup_datetime timestamp, tpep_dropoff_datetime timestamp, passenger_count long, trip_distance long, RateCodeID long, Store_and_fwd_flag string,PULocationID int, DOLocationID long, payment_type long, fare_amount long, extra long, mta_tax long,Tip_amount long, tolls_amount long, improvement_surcharge long, total_amount long, congestion_Surcharge long, airport_fee long ")
.option('cloudFiles.schemaLocation', checkpoint_path)
.load(data_source)
.writeStream
.option('checkpointLocation', checkpoint_path)
.option('mergeSchema', "true")
.table(table_name)
)
return query
query = autoload_to_table (data_source = "/mnt/landing/nyctaxi",
source_format = "parquet",
table_name = "yellow_trip_data",
checkpoint_path='/tmp/delta/yellowdata/_checkpoints'
)
However, I run into the following error. i have also attached the ipython notebook/
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 3011.0 failed 4 times, most recent failure: Lost task 0.3 in stage 3011.0 (TID 11673) (10.139.64.5 executor 0): java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainDoubleDictionary
at org.apache.parquet.column.Dictionary.decodeToLong(Dictionary.java:49)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:54)
โ06-27-2022 07:50 AM
As @Werner Stinckensโ said.
Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.
schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
โ06-27-2022 12:19 AM
it could be an incompatible schema,
there is a knowledge base article about that.
โ06-27-2022 07:50 AM
As @Werner Stinckensโ said.
Just load your file the normal way (spark.read.parquet ) without specifying schema and then extract DDL.
schema_json = spark.read.parquet("your_file.parquet").schema.json()
ddl = spark.sparkContext._jvm.org.apache.spark.sql.types.DataType.fromJson(schema_json).toDDL()
print(ddl)
โ06-27-2022 08:25 AM
Smart idea. Let me try this one. @Hubert Dudekโ
โ06-27-2022 08:45 AM
This ran !!! you are awesome @Hubert Dudekโ
โ06-28-2022 06:50 AM
Great! Thank you.
โ06-27-2022 10:15 AM
Hey @Mayank Srivastavaโ
Hope you are well!
We are happy to know that you were able to resolve your issue. It would be really awesome if you could mark the answer as best. It would be really helpful for the other members too.
Cheers!
โ06-27-2022 10:49 AM
Done !!
โ06-27-2022 11:06 AM
Hi again @Mayank Srivastavaโ
Thank you so much for getting back to us and marking the answer as best.
We really appreciate your time.
Wish you a great Databricks journey ahead!
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group