Context:
I'm using DataFrameWriter to load the dataSet into the Redshift. DataFrameWriter writes the dataSet to S3, and loads data from S3 to Redshift by issuing the Redshift copy command.
Issue:
In frequently we are observing, the data is present in the S3 but not loaded to the Redshift by DataFrameWriter and during this situations DataFrameWriter.save() operation is succeeding without any error.
The latest DataFrameWriter documentation says that there could be missing or incomplete data due to S3’s eventual consistency in listing operations. Even though S3’s official document says that eventual consistency happens only for bucket deletions and for versioned buckets, I don’t have more information on how this is relevant to the claim made by DataFrameWriter that data could go missing.
Documentations blurb
DataFrameWriter Documentation (https://docs.databricks.com/external-data/amazon-redshift.html): “When reading from and writing to Redshift, the data source reads and writes data in S3. Both Spark and Redshift produce partitioned output and store it in multiple files in S3. According to the Amazon S3 Data Consistency Model documentation, S3 bucket listing operations are eventually-consistent, so the files must to go to special lengths to avoid missing or incomplete data due to this source of eventual-consistency.”
S3: S3 Data Consistency Model documentation (https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel)
Ask:
- Is this the expected behaviour of the DataFrameWriter that even though DataFrameWriter.save() operation succeeded without any error, are there any chances data not loaded to Redshift?
- How to deal with this situation?
Code:
// dataframe is a obj of Dataset<Row>.
DataFrameWriter<Row> writer = dataframe.write()
.mode(SaveMode.Append)
.format("io.github.spark_redshift_community.spark.redshift")
.option("url", this.config.getUrl())
.option("user", username)
.option("password", password)
.option("tempdir", this.config.getTempS3Dir())
.option("aws_iam_role", iamRole)
.option("ssl", "true")
.option("sslfactory", "org.redshift.ssl.NonValidatingFactory")
.option("tempformat", DEFAULT_TEMP_FORMAT)
.option("dbtable", tableName);
writer.save();