cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Read/Write concurrency issue

APol
New Contributor II

Hi.

I assume that it can be concurrency issue. (a Read thread from Databricks and a Write thread from another system)

From the start:

  1. I read 12-16 csv files (approximately 250Mb each of them) to dataframe. df = spark.read.option("header", "False").option("delimiter", ',').option('quote','"').option("multiLine","true").option("escape", "\"").option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'").schema(schema).csv(partition_list)
  2. Print count of rows. print(df.count())
  3. Save dataframe to database. df.write.format('delta').mode('overwrite').option("overwriteSchema","true").saveAsTable(f"{db_name}.{table_name}")

This process is running once a day.

Sometimes I receive this error: "An error occurred while calling oXXXX.saveAsTable" (First 2 steps always work correct).

There is one important moment: when I read these files from ADLS, some of them can be overwritten by another system (according to file's LastModified date in storage).

I will add error output in attachment.

Do you know what can occur this error and how it can be solved?

2 REPLIES 2

jose_gonzalez
Databricks Employee
Databricks Employee

The error message shows:

Caused by: java.lang.IllegalStateException: Error reading from input

at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:84)

at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:203)

at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:280)

at com.univocity.parsers.common.input.DefaultCharAppender.appendUntil(DefaultCharAppender.java:292)

at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:177)

at com.univocity.parsers.csv.CsvParser.parseSingleDelimiterRecord(CsvParser.java:194)

at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:109)

at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:581)

... 34 more

Caused by: java.io.IOException: java.io.IOException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://ACCOUNT_NAME.dfs.core.windows.net/CONTAINER_NAME/INSTANCE_NAME/Tables/Custom/FOLDER_NAME/fil..., ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:d4a3e6af-701f-003e-3590-b7b51a000000 Time:2022-08-24T08:03:57.9309350Z"

at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:77)

... 1 more

This is a 412 error message. Could you open a support ticket and share the error message? The Storage team should be able to get the logs and provide more information on why this is happening

FerArribas
Contributor

Hi @Anastasiia Polianska​,

I agree, it looks like a concurrency issue. Very possibly this concurrency problem will be caused by an erroneous ETAG in the HTTP call to the Azure Storage API (https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/)

The concurrency behavior can be configured according to the hadoop-azure library documentation. It is the library used to access ADLS (abfss)

https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Server_Options

Surely these links will help you understand/solve your problem:

Hadoop-ABFS

Thanks.

Fernando Arribas.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group