cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Read/Write concurrency issue

APol
New Contributor II

Hi.

I assume that it can be concurrency issue. (a Read thread from Databricks and a Write thread from another system)

From the start:

  1. I read 12-16 csv files (approximately 250Mb each of them) to dataframe. df = spark.read.option("header", "False").option("delimiter", ',').option('quote','"').option("multiLine","true").option("escape", "\"").option("dateFormat", "yyyy-MM-dd'T'HH:mm:ss'Z'").schema(schema).csv(partition_list)
  2. Print count of rows. print(df.count())
  3. Save dataframe to database. df.write.format('delta').mode('overwrite').option("overwriteSchema","true").saveAsTable(f"{db_name}.{table_name}")

This process is running once a day.

Sometimes I receive this error: "An error occurred while calling oXXXX.saveAsTable" (First 2 steps always work correct).

There is one important moment: when I read these files from ADLS, some of them can be overwritten by another system (according to file's LastModified date in storage).

I will add error output in attachment.

Do you know what can occur this error and how it can be solved?

2 REPLIES 2

jose_gonzalez
Moderator
Moderator

The error message shows:

Caused by: java.lang.IllegalStateException: Error reading from input

at com.univocity.parsers.common.input.DefaultCharInputReader.reloadBuffer(DefaultCharInputReader.java:84)

at com.univocity.parsers.common.input.AbstractCharInputReader.updateBuffer(AbstractCharInputReader.java:203)

at com.univocity.parsers.common.input.AbstractCharInputReader.nextChar(AbstractCharInputReader.java:280)

at com.univocity.parsers.common.input.DefaultCharAppender.appendUntil(DefaultCharAppender.java:292)

at com.univocity.parsers.common.input.ExpandingCharAppender.appendUntil(ExpandingCharAppender.java:177)

at com.univocity.parsers.csv.CsvParser.parseSingleDelimiterRecord(CsvParser.java:194)

at com.univocity.parsers.csv.CsvParser.parseRecord(CsvParser.java:109)

at com.univocity.parsers.common.AbstractParser.parseNext(AbstractParser.java:581)

... 34 more

Caused by: java.io.IOException: java.io.IOException: Operation failed: "The condition specified using HTTP conditional header(s) is not met.", 412, GET, https://ACCOUNT_NAME.dfs.core.windows.net/CONTAINER_NAME/INSTANCE_NAME/Tables/Custom/FOLDER_NAME/fil..., ConditionNotMet, "The condition specified using HTTP conditional header(s) is not met. RequestId:d4a3e6af-701f-003e-3590-b7b51a000000 Time:2022-08-24T08:03:57.9309350Z"

at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.services.ReadBufferWorker.run(ReadBufferWorker.java:77)

... 1 more

This is a 412 error message. Could you open a support ticket and share the error message? The Storage team should be able to get the logs and provide more information on why this is happening

FerArribas
Contributor

Hi @Anastasiia Polianska​,

I agree, it looks like a concurrency issue. Very possibly this concurrency problem will be caused by an erroneous ETAG in the HTTP call to the Azure Storage API (https://azure.microsoft.com/de-de/blog/managing-concurrency-in-microsoft-azure-storage-2/)

The concurrency behavior can be configured according to the hadoop-azure library documentation. It is the library used to access ADLS (abfss)

https://hadoop.apache.org/docs/stable/hadoop-azure/abfs.html#Server_Options

Surely these links will help you understand/solve your problem:

Hadoop-ABFS

Thanks.

Fernando Arribas.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.