cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Two stage join fails with java.lang.UnsupportedOperationException: org.apache.parquet.column.values.dictionary.PlainValuesDictionary$PlainLongDictionary

ChristianKeller
New Contributor II

Sometimes the error is part of

"org.apache.spark.SparkException: Exception thrown in awaitResult:".

The error source is the step, where we extract the second time the rows, where the data is updated. We can count the rows, but we cannot display or write them as a table.

Our approach is:

We import our data day by day - up to 500.000 rows and, for a complete depiction of the needed information up to 105 columns, but we reduced our table already to the minimal numbers of columns of 8.

We get both new data and updates.

Instead of overwriting existing rows with our updates we would like to append them as the incremental change with respect to the existing rows.

We import one initial table.

We run the update/append algorithmus once successfully - all join, union and write commands work.

Our table schema is:

The first five columns and the last column will be called dimensions, the others are base for the increment computation :

  • from which server
  • the last update time (AS our import key)
  • the time of import
  • a classification information as integer
  • the ID (AS our filter key)
  • one integer type
  • one double type
  • the day, the row was created initially in our database (AS our partition key)
// statusIdsChanged only consists of the Id column
val statusIdsChanged = {
  val statusRowsReduced = sqlContext.table(BASE_TABLE_NAME).select("Id")
  statusRowsReduced.join(updatedBetId, statusRowsReduced("Id") === updatedBetId                 ("updateId")).drop("updateId")
}
val status = sqlContext.table(BASE_TABLE_NAME)
status.count()
def splitNamesAt(colName: org.apache.spark.sql.Dataset[Row], splitName: String) = {
  val cols = colName.columns
  cols.splitAt(cols.indexOf(splitName))
}
val (dimensionsStatus, incrementsStatus) = splitNamesAt(status, "Id")
// The following value is the error source.
val statusRowsChanged = status.join(statusIdsChanged, "Id")
                  .select(incrementsStatus.map(col):_*)
                  .drop("deliveryDay")

6 REPLIES 6

activescott
New Contributor III

I am running into this as well and the problem seems intermittent. Have you found a solution?

ChristianKeller
New Contributor II

@activescott Yes, we did. We found out that in our application case some data types cause the problem. In addition the error occurs while reading data in spark - e.g. after transformation, but not always at the same point in your code. The cause is how you write your data. We overcome the problem as follows:

1. Checking to which data types our base data will be casted automatically during the "loading to spark" step. For instance in our case spark had problems to read the sql integer type?!

2. Take care how the spark functions or your udf's change the data types. For instance we ran into the error because precision or scale of our decimals changed.

activescott
New Contributor III

Thanks Lleido. I eventually found I had changed the schema of a partitioned DataFrame that I had made inadvertently where I narrowed a column's type from a long to an integer. While rather obvious cause of the problem in hindsight it was terribly difficult to troubleshoot at first since it was inconsistent. In the end, I found if I turn on "mergeSchema" option to force the schema to merge across all partitions it would at least manifest the problem immediately rather than intermittently as different parts of the code ran (and accessed different old saved data frames). To merge the schema:

sqlContext.read.option('mergeSchema', True).parquet(...)

vsamma
New Contributor II

So you added "mergeSchema" so you would find the error immediately when some column types do not match? But how did you fix the actual problem of the same column having different types for different sets of same structure data? For example I have raw json data and there's a column which is supposed to be double type, but if by chance one set has only full integers, they are written in json without floating points and therefore spark doesn't know it has to be double and it reads it as long. Then when I write the same data as Parquet and later try to read this together with other sets, it fails

In that particular case, I think I decided the data wasn't feasible to recover. It might be, but I decided to stop spending time on it and rebuilt the data from source. It wasn't a terribly large amount of data so that was feasible in that case.

In other cases, I have written "schema change" scripts to go through the old dataframe, change the schema and save it as a new dataframe with the new schema.

To make it feasible to recover/schema change for instances like this in the future, I save saving all dataframes into a subfolder with the "/key=..." naming convention in the file name. So the filename looks like the following:

path = METRICS_DATA_FRAME_FILE_NAME + '/key={week}_{product}_{version}_{timestamp}'.format(week=self.week, product=self.product, version=latestVersion, timestamp=timestamp ) 

This makes the whole data frame load easily by a single load from "METRICS_DATA_FRAME_FILE_NAME", but each job- which may introduce a breaking schema change- can be loaded independently as well.