cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks fails writing after writing ~30 files

pine
New Contributor III

Good day,

Copy of https://stackoverflow.com/questions/69974301/looping-through-files-in-databricks-fails

I got 100 files of csv data on adls-gen1 store. I want to do some processing to them and save results to same drive, different directory.

def lookup_csv(CR_nro, hlo_lista =[], output = my_output_dir 😞 
   base_lib = 'adl://azuredatalakestore.net/<address>'
  all_files = pd.DataFrame(dbutils.fs.ls(base_lib + f'CR{CR_nro}'), columns = ['full', 'name', 'size'])
  done = pd.DataFrame(dbutils.fs.ls(output), columns = ['full', 'name', 'size'])
  all_files = all_files[~all_files['name'].isin(done['name'].str.replace('/', ''))]
  all_files = all_files[~all_files['name'].str.contains('header')]
 
  my_scema = spark.read.csv(base_lib + f'CR{CR_nro}/header.csv', sep='\t', header=True, maxColumns = 1000000).schema
  tmp_lst = ['CHROM', 'POS', 'ID', 'REF', 'ALT', 'QUAL', 'FILTER', 'INFO', 'FORMAT'] + [i for i in hlo_lista  if i in my_scema.fieldNames()]
 
  for my_file in all_files.iterrows(): 
    print(my_file[1]['name'], time.ctime(time.time()))
    data = spark.read.option('comment', '#').option('maxColumns', 1000000).schema(my_scema).csv(my_file[1]['full'], sep='\t').select(tmp_lst)
    data.write.csv( output + my_file[1]['name'], header=True, sep='\t')

This works for ~30 files then hangs with error:

Py4JJavaError: An error occurred while calling o70690.csv. ​Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 154.0 failed 4 times, most recent failure: Lost task 0.3 in stage 154.0 (TID 1435, 10.11.64.46, executor 7): com.microsoft.azure.datalake.store.ADLException: Error creating file <my_output_dir>CR03_pt29.vcf.gz/_started_1438828951154916601 Operation CREATE failed with HTTP401 : null Last encountered exception thrown after 2 tries. [HTTP401(null),HTTP401(null)]

Apparently my credentials to access ald drive fail? Or time expires for single command? Any idea why this would happen?

If I queue the commands:

 lookup_csv(<inputs>)

<new cell>

 lookup_csv(<inputs>)

It works, fails and works next cell. But If I try to loop the commands:

for i in range(10)

lookup_csv<inputs>

if fails and keeps failing till the end of time.

Maybe I need to refresh the credentials every 10 files or something?

1 ACCEPTED SOLUTION

Accepted Solutions

Hubert-Dudek
Esteemed Contributor III

You can access without mount but still you need to register an app and apply config via spark settings in your notebook to get the access to ADLS. It should be permanent for whole session thanks to azure app:

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"), 
spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")

This explanation is the best https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#acc... although I remember that first times I had also problems with that. On that page is also explained how to register an app. Maybe it will be ok for your company policies.

View solution in original post

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

was actually anything created by script in directory <my_output_dir>?

The best would be to permanently mount ADSL storage and use azure app for that.

In Azure please go to App registrations - register app with name for example "databricks_mount" . Add IAM role "Storage Blob Data Contributor" for that app in your delta lake storage.

configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider", 
          "fs.azure.account.oauth2.client.id": "<your-client-id>",
          "fs.azure.account.oauth2.client.secret": "<your-secret>",
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<your-endpoint>/oauth2/token"}
 
dbutils.fs.mount(
 source = "abfss://delta@yourdatalake.dfs.core.windows.net/",
 mount_point = "/mnt/delta",
 extra_configs = configs)

pine
New Contributor III

Yes. The ~29 files are created just fine.

In SO it's pointed out that AD passthrough credentials expire after 60 minutes for single cell of command. This strikes as plausible explanation.

hi @pine​ ,

It seems like the error is coming from your storage layer. Try to follow @Hubert Dudek​ recommendation to create a mount point.

pine
New Contributor III

So it seems. Too bad corporate bureaucracy strictly forbids mounting anything.

Well, I suppose I got to bring this up to the policy board.

Hubert-Dudek
Esteemed Contributor III

You can access without mount but still you need to register an app and apply config via spark settings in your notebook to get the access to ADLS. It should be permanent for whole session thanks to azure app:

spark.conf.set("fs.azure.account.auth.type", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider"), 
spark.conf.set("fs.azure.account.oauth2.client.id", "<your-client-id>")
spark.conf.set("fs.azure.account.oauth2.client.secret", "<your-secret>")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "https://login.microsoftonline.com/<your-endpoint>/oauth2/token")

This explanation is the best https://docs.databricks.com/data/data-sources/azure/adls-gen2/azure-datalake-gen2-sp-access.html#acc... although I remember that first times I had also problems with that. On that page is also explained how to register an app. Maybe it will be ok for your company policies.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group