cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Dataframe Write Append to Parquet Table - Partition Issue

RobertWalsh
New Contributor II

Hello,

I am attempting to append new json files into an existing parquet table defined in Databricks.

Using a dataset defined by this command (dataframe initially added to a temp table):

val output = sql("select headers.event_name, to_date(from_unixtime(headers.received_timestamp)) as dt, from_unixtime(headers.received_timestamp) AS login_datetime, headers.ip_address, headers.acting_user_id from usersloggedInRaw_tmp")

I create the initial table with the following:

output.write.format("parquet").partitionBy("dt").saveAsTable("dev_sessions")

This output of this table looks like the following:

0693f000007OoJYAA0

If I try to append a new json file to the now existing 'dev_session' table, using the following:

output.write.mode("append").format("parquet").partitionBy("dt").saveAsTable("dev_sessions")

Here is what I see:

0693f000007OoJZAA0

The dataset seems to 'shift'. For example, the acting_user_id value is now populating the 'dt' column, the column used in the append command to partition the data.

I have tried this flow multiple times and can reproduce the same result. Is this a bug in dataframe.write(), or am I making a mistake somewhere? Note that prior to appending the table, I inspect the 'output' dataframe in databricks via the display() command and there is no issues - the values are in their expected columns. It is only after appending to the table using the write command that the issue seems to occur.

Any help that can be provided would be sincerely appreciated.

11 REPLIES 11

User16301467532
New Contributor II

This seems to be a corner case which no one has reported. If you can send a link to your notebook and verbally authorize me to view/run it, I can take a look at the issue.

Thanks for responding. However, I have since moved away from this approach and made heavy modifications to the notebook of interest as I did not see a way to define a specific path in conjunction with saveAsTable. I would like it to point to a S3 directory of my choosing. Therefore I am now using "sqlContext.createExternalTable(tableName, warehouseDirectory)" in conjunction with "sqlContext.refreshTable(tableName)". With this approach, I do not get the same issue. When adding new data, no 'shifting' of the data takes place.

vida
Contributor II
Contributor II

Just updating this question with the latest update. Omoshiroi says he found a workaround:

I have since moved away from this approach and made heavy modifications to the notebook of interest as I did not see a way to define a specific path in conjunction with saveAsTable. I would like it to point to a S3 directory of my choosing. Therefore I am now using "sqlContext.createExternalTable(tableName, warehouseDirectory)" in conjunction with "sqlContext.refreshTable(tableName)". With this approach, I do not get the same issue. When adding new data, no 'shifting' of the data takes place.

If anyone runs into the issue above though - let us know - if we can get a reliable reproduction - we'd love to debug and fix that.

CliveEvans
New Contributor III

I can reproduce this quite simply here.

case class Thing(first: String, second: String)

val df = sqlContext.createDataFrame(Seq(Thing("one","two")))

df.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("first").saveAsTable("example") df.write.format("parquet").mode(org.apache.spark.sql.SaveMode.Append).partitionBy("first").saveAsTable("example")

Results in the following on disk:

-rw-r--r--. 1 dev dev  211 Oct 30 11:37 _common_metadata
drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=one
drwxrwxr-x. 2 dev dev 4096 Oct 30 11:37 first=two
-rw-r--r--. 1 dev dev  459 Oct 30 11:37 _metadata
-rw-r--r--. 1 dev dev    0 Oct 30 11:37 _SUCCES

Tried on 1.5.0 and 1.5.1

vida
Contributor II
Contributor II

Thanks, Clive - I can reproduce the same issue - so I filed a bug to the Databricks open source team to take a look.

CliveEvans
New Contributor III

Thanks. I've gone with the workaround for now.

Can you give me a link to the bug?

vida
Contributor II
Contributor II

I'm not sure if our internal JIRA is accessible externally. But here's the link:

https://databricks.atlassian.net/browse/SC-1029

CliveEvans
New Contributor III

I can't see it, but thanks.

vida
Contributor II
Contributor II

There's a PR now out with a fix:

https://github.com/apache/spark/pull/9408/files

CliveEvans
New Contributor III

Nice.

Thanks.

anil_s_langote
New Contributor II

We came across similar situation we are using spark 1.6.1, we have a daily load process to pull data from oracle and write as parquet files, this works fine for 18 days of data (till 18th run), the problem comes after 19th run where the data frame load job getting called multiple times and it never completes, when we delete all the partitioned data and run just for 19 day it works which proves that there is no issue data. How can we proceed with this, is disabling the metadata helps? if yes then can we run into issues when we have more than 500 partitions?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group