InconsistentReadException: The file might have been updated during query - CSV backed table
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-27-2023 03:01 PM
I have some CSV files that I upload to DBFS storage several times a day. From these CSVs, I have created SQL tables:
CREATE TABLE IF NOT EXISTS masterdata.lookup_host
USING CSV
OPTIONS (header "true", inferSchema "true")
LOCATION '/mnt/masterdata/assets-lookup.csv';
This works well for short queries but occasionally a long-running query will be disrupted when the underlying CSV is updated.
23/11/27 21:09:57 ERROR Executor: Exception in task 2.0 in stage 339.0 (TID 1167)
com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt/masterdata/assets-lookup.csv.
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:695)
.....
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: com.databricks.common.filesystem.InconsistentReadException: The file might have been updated during query execution. Ensure that no pipeline updates existing files during query execution and try again.
This may be the result of this change:
Databricks Runtime returns an error if a file is modified between query planning and invocation
Databricks Runtime queries now return an error if a file is updated between query planning and invocation. Before this change, Databricks Runtime would read a file between these stages, which occasionally lead to unpredictable results.
Is there a way to accept/ignore this file change and continue query execution?
Is there a better way to keep simple master/lookup data available for SQL joins and subqueries?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-28-2023 04:16 AM
One approach I'm testing (positive results so far, but still early).
%sql
# Prep and cleanup
REFRESH TABLE masterdata.lookup_host;
DROP TABLE IF EXISTS t_hosts;
# Forcibly cache the needed columns before using the data in another query.
CACHE TABLE t_hosts
OPTIONS ('storageLevel' 'DISK_ONLY')
SELECT identifier,category,domain
FROM masterdata.lookup_host lh
WHERE lh.category = 'workstation';
SELECT * FROM t_hosts
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-07-2024 12:40 AM
Hello, I want to reopen this issue, since I am facing same error in our production environment and I am not able to solve this and want to ask for help.
Here is the error message I received:
Error while reading file dbfs:/mnt/dynamics/model.json.
Caused by: FileReadException: Error while reading file dbfs:/mnt/dynamics/model.json.
Caused by: InconsistentReadException: The file might have been updated during query execution. Ensure that no pipeline updates existing files during query execution and try again.
Caused by: IOException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://dynamics.dfs.core.windows.net/dataverse/model.json?timeout=90, ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:78fa238b-501f-............ Time:2024-06-06T21:05:33.5168118Z"
Caused by: AbfsRestOperationException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://dynamics.dfs.core.windows.net/dataverse/model.json?timeout=90, ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:78fa238b-............ Time:2024-06-06T21:05:33.5168118Z"
So apparently in the processing of a table, the model.json changes - it changed real-time by the source, so this is expected.
What I want is to just ignore that is changed and dont throw error. Just let it take model.json as is in the time of processing.
I tried to cache dataframe as last suggested, but it doesnt work and still throws same error.
This is what I added:
import org.apache.spark.storage.StorageLevel
val abcd = spark.read.json(somePath/model.json")
abcd.persist(StorageLevel.DISK_ONLY)
Please suggest how to deal with this error.
Thank you.

