cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Community Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Error Spark reading CSV from DBFS MNT: incompatible format detected

Paul1
New Contributor

I am trying to follow along with a training course, but I am consistently running into an error loading a CSV with Spark from DBFS.  Specifically, I keep getting an "Invalid format detected error".  Has anyone else encountered this and found a solution?  Code an error message below:

Code

file_path = f"dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv"

raw_df = spark.read.csv(file_path, header="true", inferSchema="true", multiLine="true", escape='"')

display(raw_df)

 Error

AnalysisException: Incompatible format detected.

A transaction log for Delta was found at `dbfs://_delta_log`,
but you are trying to read from `dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv` using format("csv"). You must use
'format("delta")' when reading and writing to a delta table.

To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html
---------------------------------------------------------------------------
AnalysisException                         Traceback (most recent call last)
File <command-3615789235235519>:3
      1 file_path = f"dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv"
----> 3 raw_df = spark.read.csv(file_path, header="true", inferSchema="true", multiLine="true", escape='"')
      5 display(raw_df)

File /databricks/spark/python/pyspark/instrumentation_utils.py:48, in _wrap_function.<locals>.wrapper(*args, **kwargs)
     46 start = time.perf_counter()
     47 try:
---> 48     res = func(*args, **kwargs)
     49     logger.log_success(
     50         module_name, class_name, function_name, time.perf_counter() - start, signature
     51     )
     52     return res

File /databricks/spark/python/pyspark/sql/readwriter.py:729, in DataFrameReader.csv(self, path, schema, sep, encoding, quote, escape, comment, header, inferSchema, ignoreLeadingWhiteSpace, ignoreTrailingWhiteSpace, nullValue, nanValue, positiveInf, negativeInf, dateFormat, timestampFormat, maxColumns, maxCharsPerColumn, maxMalformedLogPerPartition, mode, columnNameOfCorruptRecord, multiLine, charToEscapeQuoteEscaping, samplingRatio, enforceSchema, emptyValue, locale, lineSep, pathGlobFilter, recursiveFileLookup, modifiedBefore, modifiedAfter, unescapedQuoteHandling)
    727 if type(path) == list:
    728     assert self._spark._sc._jvm is not None
--> 729     return self._df(self._jreader.csv(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    730 elif isinstance(path, RDD):
    732     def func(iterator):

File /databricks/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/java_gateway.py:1321, in JavaMember.__call__(self, *args)
   1315 command = proto.CALL_COMMAND_NAME +\
   1316     self.command_header +\
   1317     args_command +\
   1318     proto.END_COMMAND_PART
   1320 answer = self.gateway_client.send_command(command)
-> 1321 return_value = get_return_value(
   1322     answer, self.gateway_client, self.target_id, self.name)
   1324 for temp_arg in temp_args:
   1325     temp_arg._detach()

File /databricks/spark/python/pyspark/errors/exceptions.py:234, in capture_sql_exception.<locals>.deco(*a, **kw)
    230 converted = convert_exception(e.java_exception)
    231 if not isinstance(converted, UnknownException):
    232     # Hide where the exception came from that shows a non-Pythonic
    233     # JVM exception message.
--> 234     raise converted from None
    235 else:
    236     raise

AnalysisException: Incompatible format detected.

A transaction log for Delta was found at `dbfs://_delta_log`,
but you are trying to read from `dbfs:/mnt/dbacademy-datasets/scalable-machine-learning-with-apache-spark/v02/airbnb/sf-listings/sf-listings-2019-03-06.csv` using format("csv"). You must use
'format("delta")' when reading and writing to a delta table.

To disable this check, SET spark.databricks.delta.formatCheck.enabled=false
To learn more about Delta, see https://docs.databricks.com/delta/index.html

 

1 REPLY 1

MichTalebzadeh
Contributor III

Well your error message is telling you that Spark is encountering a Delta table conflict while trying to read a CSV file. The file path dbfs:/mnt/dbacademy... points to a CSV file. This is where the fun begins. Spark detects a Delta transaction log dbfs://_delta_log in the same dbfs mount point. Now since Delta tables have a specific format, Spark gives priority to the Delta format check and throws an error when you try to read the file as a CSV.

So you need to ascertain if the file you are reading is a Delta table. in that case use
raw_df = spark.read.format("delta").load(your_file_path)

Else ensure the CSV file name doesn't conflict with any existing Delta table in the same dbms mount. Just rename the CSV file to avoid the conflict.

HTH

Mich Talebzadeh | Technologist | Data | Generative AI | Financial Fraud
London
United Kingdom

view my Linkedin profile



https://en.everybodywiki.com/Mich_Talebzadeh



Disclaimer: The information provided is correct to the best of my knowledge but of course cannot be guaranteed . It is essential to note that, as with any advice, quote "one test result is worth one-thousand expert opinions (Werner Von Braun)".
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!