Error Spark reading CSV from DBFS MNT: incompatible format detected
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-07-2024 02:12 PM
I am trying to follow along with a training course, but I am consistently running into an error loading a CSV with Spark from DBFS. Specifically, I keep getting an "Invalid format detected error". Has anyone else encountered this and found a solution? Code an error message below:
Code
file_path = f"dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv"
raw_df = spark.read.csv(file_path, header="true", inferSchema="true", multiLine="true", escape='"')
display(raw_df)
Error
AnalysisException: Incompatible format detected.
A transaction log for Delta was found at `dbfs://_delta_log`,
but you are trying to read from `dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv` using format("csv"). You must use
'format("delta")' when reading and writing to a delta table.
To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html
---------------------------------------------------------------------------
AnalysisException Traceback (most recent call last)
File <command-3615789235235519>:3
1 file_path = f"dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv"
----> 3 raw_df = spark.read.csv(file_path, header="true", inferSchema="true", multiLine="true", escape='"')
5 display(raw_df)
File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
46 start = time.perf_counter()
47 try:
---> 48 res = func(*args, **kwargs)
49 logger.log_success(
50 module_name, class_name, function_name, time.perf_counter() - start, signature
51 )
52 return res
File /databricks/spark/python/pyspark/sql/readwriter.py:729, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
727 if type(path) == list:
728 assert self._spark._sc._jvm is not None
--> 729 return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
730 elif isinstance(path, RDD):
732 def func(iterator):
File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
1315 command = proto.CALL_COMMAND_NAME +\
1316 self.command_header +\
1317 args_command +\
1318 proto.END_COMMAND_PART
1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
1322 answer, self.gateway_client, self.target_id, self.name)
1324 for temp_arg in temp_args:
1325 temp_arg._detach()
File /databricks/spark/python/pyspark/errors/exceptions.py:234, in capture_sql_exception.<locals>.deco(*a, **kw)
230 converted = convert_exception(e.java_exception)
231 if not isinstance(converted, UnknownException):
232 # Hide where the exception came from that shows a non-Pythonic
233 # JVM exception message.
--> 234 raise converted from None
235 else:
236 raise
AnalysisException: Incompatible format detected.
A transaction log for Delta was found at `dbfs://_delta_log`,
but you are trying to read from `dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv` using format("csv"). You must use
'format("delta")' when reading and writing to a delta table.
To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-07-2024 04:58 PM
Well your error message is telling you that Spark is encountering a Delta table conflict while trying to read a CSV file. The file path dbfs:/mnt/dbacademy... points to a CSV file. This is where the fun begins. Spark detects a Delta transaction log dbfs://_delta_log in the same dbfs mount point. Now since Delta tables have a specific format, Spark gives priority to the Delta format check and throws an error when you try to read the file as a CSV.
So you need to ascertain if the file you are reading is a Delta table. in that case use
raw_df = spark.read.format("delta").load(your_file_path)
Else ensure the CSV file name doesn't conflict with any existing Delta table in the same dbms mount. Just rename the CSV file to avoid the conflict.
HTH
London
United Kingdom
view my Linkedin profile
https://en.everybodywiki.com/Mich_Talebzadeh
Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".

