10-05-2016 06:10 AM
Sometimes the error is part of
"org.apache.spark.SparkException: Exception thrown in awaitResult:".
The error source is the step, where we extract the second time the rows, where the data is updated. We can count the rows, but we cannot display or write them as a table.
Our approach is:
We import our data day by day - up to 500.000 rows and, for a complete depiction of the needed information up to 105 columns, but we reduced our table already to the minimal numbers of columns of 8.
We get both new data and updates.
Instead of overwriting existing rows with our updates we would like to append them as the incremental change with respect to the existing rows.
We import one initial table.
We run the update/append algorithmus once successfully - all join, union and write commands work.
Our table schema is:
The first five columns and the last column will be called dimensions, the others are base for the increment computation :
// statusIdsChanged only consists of the Id column
val statusIdsChanged = {
val statusRowsReduced = sqlContext.table(BASE_TABLE_NAME).select("Id")
statusRowsReduced.join(updatedBetId, statusRowsReduced("Id") === updatedBetId ("updateId")).drop("updateId")
}
val status = sqlContext.table(BASE_TABLE_NAME)
status.count()
def splitNamesAt(colName: org.apache.spark.sql.Dataset[Row], splitName: String) = {
val cols = colName.columns
cols.splitAt(cols.indexOf(splitName))
}
val (dimensionsStatus, incrementsStatus) = splitNamesAt(status, "Id")
// The following value is the error source.
val statusRowsChanged = status.join(statusIdsChanged, "Id")
.select(incrementsStatus.map(col):_*)
.drop("deliveryDay")
10-29-2016 10:07 PM
I am running into this as well and the problem seems intermittent. Have you found a solution?
11-09-2016 11:41 PM
@activescott Yes, we did. We found out that in our application case some data types cause the problem. In addition the error occurs while reading data in spark - e.g. after transformation, but not always at the same point in your code. The cause is how you write your data. We overcome the problem as follows:
1. Checking to which data types our base data will be casted automatically during the "loading to spark" step. For instance in our case spark had problems to read the sql integer type?!
2. Take care how the spark functions or your udf's change the data types. For instance we ran into the error because precision or scale of our decimals changed.
11-10-2016 09:04 PM
Thanks Lleido. I eventually found I had changed the schema of a partitioned DataFrame that I had made inadvertently where I narrowed a column's type from a long to an integer. While rather obvious cause of the problem in hindsight it was terribly difficult to troubleshoot at first since it was inconsistent. In the end, I found if I turn on "mergeSchema" option to force the schema to merge across all partitions it would at least manifest the problem immediately rather than intermittently as different parts of the code ran (and accessed different old saved data frames). To merge the schema:
sqlContext.read.option('mergeSchema', True).parquet(...)
12-07-2016 01:52 AM
So you added "mergeSchema" so you would find the error immediately when some column types do not match? But how did you fix the actual problem of the same column having different types for different sets of same structure data? For example I have raw json data and there's a column which is supposed to be double type, but if by chance one set has only full integers, they are written in json without floating points and therefore spark doesn't know it has to be double and it reads it as long. Then when I write the same data as Parquet and later try to read this together with other sets, it fails
12-07-2016 08:40 AM
In that particular case, I think I decided the data wasn't feasible to recover. It might be, but I decided to stop spending time on it and rebuilt the data from source. It wasn't a terribly large amount of data so that was feasible in that case.
In other cases, I have written "schema change" scripts to go through the old dataframe, change the schema and save it as a new dataframe with the new schema.
12-07-2016 08:42 AM
To make it feasible to recover/schema change for instances like this in the future, I save saving all dataframes into a subfolder with the "/key=..." naming convention in the file name. So the filename looks like the following:
path = METRICS_DATA_FRAME_FILE_NAME + '/key={week}_{product}_{version}_{timestamp}'.format(week=self.week, product=self.product, version=latestVersion, timestamp=timestamp )
This makes the whole data frame load easily by a single load from "METRICS_DATA_FRAME_FILE_NAME", but each job- which may introduce a breaking schema change- can be loaded independently as well.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group