cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Read/Write concurrency issue

APol
New Contributor II

Hi.

I assume that it can be concurrency issue. (a Read thread from Databricks and a Write thread from another system)

From the start:

  1. I read 12-16 csv files (approximately 250Mb each of them) to dataframe. df = spark.read.option("header", "False").option("delimiter", ',').option('quote','"').option("multiLine","true").option("escape", "\"").option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'").schema(schema).csv(partition_list)
  2. Print count of rows. print(df.count())
  3. Save dataframe to database. df.write.format('delta').mode('overwrite').option("overwriteSchema","true").saveAsTable(f"{db_name}.{table_name}")

This process is running once a day.

Sometimes I receive this error: "An error occurred while calling oXXXX.saveAsTable" (First 2 steps always work correct).

There is one important moment: when I read these files from ADLS, some of them can be overwritten by another system (according to file's LastModified date in storage).

I will add error output in attachment.

Do you know what can occur this error and how it can be solved?

2 REPLIES 2

jose_gonzalez
Moderator
Moderator

The error message shows:

Caused by: java.lang.IllegalStateException: Error reading from input

at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:84)

at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:203)

at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:280)

at com.univocity.parsers.common.input.DefaultCharAppender.appendUntil(DefaultCharAppender.java:292)

at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:177)

at com.univocity.parsers.csv.CsvParser.parseSingleDelimiterRecord(CsvParser.java:194)

at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:109)

at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:581)

... 34 more

Caused by: java.io.IOException: java.io.IOException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://ACCOUNT_NAME.dfs.core.windows.net/CONTAINER_NAME/INSTANCE_NAME/Tables/Custom/FOLDER_NAME/fil..., ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:d4a3e6af-701f-003e-3590-b7b51a000000 Time:2022-08-24T08:03:57.9309350Z"

at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:77)

... 1 more

This is a 412 error message. Could you open a support ticket and share the error message? The Storage team should be able to get the logs and provide more information on why this is happening

FerArribas
Contributor

Hi @Anastasiia Polianskaโ€‹,

I agree, it looks like a concurrency issue. Very possibly this concurrency problem will be caused by an erroneous ETAG in the HTTP call to the Azure Storage API (https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/)

The concurrency behavior can be configured according to the hadoop-azure library documentation. It is the library used to access ADLS (abfss)

https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Server_Options

Surely these links will help you understand/solve your problem:

Hadoop-ABFS

Thanks.

Fernando Arribas.