08-23-2015 12:20 PM
Hello,
I am attempting to append new json files into an existing parquet table defined in Databricks.
Using a dataset defined by this command (dataframe initially added to a temp table):
val output = sql("select headers.event_name, to_date(from_unixtime(headers.received_timestamp)) as dt, from_unixtime(headers.received_timestamp) AS login_datetime, headers.ip_address, headers.acting_user_id from usersloggedInRaw_tmp")
I create the initial table with the following:
output.write.format("parquet").partitionBy("dt").saveAsTable("dev_sessions")
This output of this table looks like the following:
If I try to append a new json file to the now existing 'dev_session' table, using the following:
output.write.mode("append").format("parquet").partitionBy("dt").saveAsTable("dev_sessions")
Here is what I see:
The dataset seems to 'shift'. For example, the acting_user_id value is now populating the 'dt' column, the column used in the append command to partition the data.
I have tried this flow multiple times and can reproduce the same result. Is this a bug in dataframe.write(), or am I making a mistake somewhere? Note that prior to appending the table, I inspect the 'output' dataframe in databricks via the display() command and there is no issues - the values are in their expected columns. It is only after appending to the table using the write command that the issue seems to occur.
Any help that can be provided would be sincerely appreciated.
09-04-2015 10:19 AM
This seems to be a corner case which no one has reported. If you can send a link to your notebook and verbally authorize me to view/run it, I can take a look at the issue.
09-06-2015 12:58 PM
Thanks for responding. However, I have since moved away from this approach and made heavy modifications to the notebook of interest as I did not see a way to define a specific path in conjunction with saveAsTable. I would like it to point to a S3 directory of my choosing. Therefore I am now using "sqlContext.createExternalTable(tableName, warehouseDirectory)" in conjunction with "sqlContext.refreshTable(tableName)". With this approach, I do not get the same issue. When adding new data, no 'shifting' of the data takes place.
09-12-2015 10:10 AM
Just updating this question with the latest update. Omoshiroi says he found a workaround:
I have since moved away from this approach and made heavy modifications to the notebook of interest as I did not see a way to define a specific path in conjunction with saveAsTable. I would like it to point to a S3 directory of my choosing. Therefore I am now using "sqlContext.createExternalTable(tableName, warehouseDirectory)" in conjunction with "sqlContext.refreshTable(tableName)". With this approach, I do not get the same issue. When adding new data, no 'shifting' of the data takes place.
If anyone runs into the issue above though - let us know - if we can get a reliable reproduction - we'd love to debug and fix that.
10-30-2015 04:42 AM
I can reproduce this quite simply here.
case class Thing(first: String, second: String)val df = sqlContext.createDataFrame(Seq(Thing("one","two")))
df.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("first").saveAsTable("example") df.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("first").saveAsTable("example")
Results in the following on disk:
-rw-r--r--. 1 dev dev 211 Oct 30 11:37 _common_metadata
drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=one
drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=two
-rw-r--r--. 1 dev dev 459 Oct 30 11:37 _metadata
-rw-r--r--. 1 dev dev 0 Oct 30 11:37 _SUCCES
Tried on 1.5.0 and 1.5.1
10-30-2015 10:12 AM
Thanks, Clive - I can reproduce the same issue - so I filed a bug to the Databricks open source team to take a look.
10-30-2015 11:04 AM
Thanks. I've gone with the workaround for now.
Can you give me a link to the bug?
10-30-2015 11:27 AM
I'm not sure if our internal JIRA is accessible externally. But here's the link:
11-02-2015 02:25 AM
I can't see it, but thanks.
11-02-2015 09:33 AM
There's a PR now out with a fix:
11-03-2015 12:12 AM
Nice.
Thanks.
04-20-2016 11:46 AM
We came across similar situation we are using spark 1.6.1, we have a daily load process to pull data from oracle and write as parquet files, this works fine for 18 days of data (till 18th run), the problem comes after 19th run where the data frame load job getting called multiple times and it never completes, when we delete all the partitioned data and run just for 19 day it works which proves that there is no issue data. How can we proceed with this, is disabling the metadata helps? if yes then can we run into issues when we have more than 500 partitions?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group