InconsistentReadException: The file might have been updated during query - CSV backed table

hukel
Contributor

I have some CSV files that I upload to DBFS storage several times a day.   From these CSVs,  I have created SQL tables:

 

CREATE TABLE IF NOT EXISTS masterdata.lookup_host
USING CSV
OPTIONS (header "true", inferSchema "true")
LOCATION '/mnt/masterdata/assets-lookup.csv';

 

This works well for short queries but occasionally a long-running query will be disrupted when the underlying CSV is updated.  

 

23/11/27 21:09:57 ERROR Executor: Exception in task 2.0 in stage 339.0 (TID 1167)
com.databricks.sql.io.FileReadException: Error while reading file dbfs:/mnt/masterdata/assets-lookup.csv.
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.logFileNameAndThrow(FileScanRDD.scala:695)
.....
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: com.databricks.common.filesystem.InconsistentReadException: The file might have been updated during query execution. Ensure that no pipeline updates existing files during query execution and try again.

 

This may be the result of this change:

https://docs.databricks.com/en/release-notes/runtime/13.3lts.html#databricks-runtime-returns-an-erro...

Databricks Runtime returns an error if a file is modified between query planning and invocation

Databricks Runtime queries now return an error if a file is updated between query planning and invocation. Before this change, Databricks Runtime would read a file between these stages, which occasionally lead to unpredictable results.

Is there a way to accept/ignore this file change and continue query execution?

Is there a better way to keep simple master/lookup data available for SQL joins and subqueries?

hukel
Contributor

One approach I'm testing (positive results so far, but still early).

%sql
# Prep and cleanup
REFRESH TABLE masterdata.lookup_host;
DROP TABLE IF EXISTS t_hosts;

# Forcibly cache the needed columns before using the data in another query.  
CACHE TABLE t_hosts
OPTIONS ('storageLevel' 'DISK_ONLY')
SELECT identifier,category,domain
FROM masterdata.lookup_host lh 
WHERE lh.category = 'workstation';

SELECT * FROM t_hosts

Hello, I want to reopen this issue, since I am facing same error in our production environment and I am not able to solve this and want to ask for help.

Here is the error message I received:

Error while reading file dbfs:/mnt/dynamics/model.json.
Caused by: FileReadException: Error while reading file dbfs:/mnt/dynamics/model.json.
Caused by: InconsistentReadException: The file might have been updated during query execution. Ensure that no pipeline updates existing files during query execution and try again.
Caused by: IOException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://dynamics.dfs.core.windows.net/dataverse/model.json?timeout=90, ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:78fa238b-501f-............ Time:2024-06-06T21:05:33.5168118Z"
Caused by: AbfsRestOperationException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://dynamics.dfs.core.windows.net/dataverse/model.json?timeout=90, ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:78fa238b-............ Time:2024-06-06T21:05:33.5168118Z"

So apparently in the processing of a table, the model.json changes - it changed real-time by the source, so this is expected.

What I want is to just ignore that is changed and dont throw error. Just let it take model.json as is in the time of processing.

I tried to cache dataframe as last suggested, but it doesnt work and still throws same error.

This is what I added:

import org.apache.spark.storage.StorageLevel

val abcd = spark.read.json(somePath/model.json")
abcd.persist(StorageLevel.DISK_ONLY)

Please suggest how to deal with this error.

Thank you.