Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
09-12-2024 08:42 AM
Thanks @menotron from your reply!
Interestingly, we have been using REFRESH TABLE command even before this issue and it worked well so far. However, now with new runtime, it doesn't work anymore. I should specify the code which we use. It actually fails in different versions of our code:
1) refresh+reading hive table
spark.sql('refresh table ' + table_name)
df = spark.table(table_name)
2) reading directly from storage
df = spark.read.parquet("dbfs:/mnt/prod_profiles/parsed/personal")
3) reading data as streaming (it is an old code when autoloader wasn't in place) - the error occurs when trying to write data into delta
df = spark.readStream.format('parquet').schema(schema).option('path', input_path).load()
(df.writeStream
.format("delta")
.option("checkpointLocation", checkpoint_path)
.option("path", target_path)
.option("mode", "append")
.trigger(once=True)
.start())
About your last suggestion, I am not sure if I understand exactly what you mean:
If Delta cache is stale or the underlying files have been removed, you can invalidate Delta cache manually by restarting the cluster.
We have this code as part of our pipeline, so we are searching to some automatic solution. For now, we are trying to use simple retry but I don't think it is ideal because it is not ensuring 100 % success and makes our pipeline potentially longer. Another thing we tried (but with not high hopes that it can actually help), was even higher runtime 15.4 LTS ML. The result was the same...
Any other suggestions highly appreciated if more details helped to explain the issue better.