cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Pyspark: You cannot use dbutils within a spark job

Nandini
New Contributor II

I am trying to parallelise the execution of file copy in Databricks. Making use of multiple executors is one way. So, this is the piece of code that I wrote in pyspark.

def parallel_copy_execution(src_path: str, target_path: str):
  files_in_path = dbutils.fs.ls(src_path)
  file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
  file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))

I fetched all the files to copy and created a Dataframe. And when trying to run a foreach on top of the DataFrame I am getting the following error. It says that 

`You cannot use dbutils within a spark job`

You cannot use dbutils within a spark job or otherwise pickle it.
            If you need to use getArguments within a spark job, you have to get the argument before
            using it in the job. For example, if you have the following code:
 
              myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
 
            Then you should use it this way:
 
              argX = dbutils.args.getArgument("X")
              myRdd.map(lambda i: argX + str(i))

But when I try the same in Scala. It works perfectly. The dbutils is used inside a spark job then. Attaching that piece of code as well.

def parallel_copy_execution(p: String, t: String): Unit = {
  dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
    dbutils.fs.cp(file(0).toString,t , recurse=true)
    println(s"cp file: $file")
  }
}

Is the Pyspark API's not updated to handle this?

If yes, please suggest an alternative to process parallel the dbutils command.

10 REPLIES 10

Ajay-Pandey
Esteemed Contributor III

I think Pyspark API does not support it now.

Nandini
New Contributor II

Thanks! Is there any other alternative of parallel processing ?

pulkitm
New Contributor III

@Nandini Raja​ Were you able to find a solution for this? We are trying to bulk copy files from s3 to ADLS Gen2 and dbutils being single threaded is a pain. I even tried the scala code that worked for you but I get the below error:

Caused by: KeyProviderException: Failure to initialize configuration

Caused by: InvalidConfigurationValueException: Invalid configuration value detected for fs.azure.account.key

This may be because this conf is not available at executor level.

Any solution you were able to figure out?

KVNARK
Honored Contributor II

May be we can try FileUtil.copy

Anonymous
Not applicable

Could you use delta clone to copy tables?

Hubert-Dudek
Esteemed Contributor III

Yes, I think the best will be to rebuild the code entirely and use, for example, COPY INTO.

  • dbutils utilize just one core
  • rdd, is not optimized by the catalyst, and AQE
  • high-level code like COPY INTO is executed distributed way and is optimized

Hubert-Dudek
Esteemed Contributor III

Alternatively, copy files using Azure Data Factory. It has great throughput.

VaibB
Contributor

You can't use dbutils command inside pyspark API. Try using s3 copy or equivalent in Azure.

Matt101122
Contributor

@Nandini Raja​ I did something similar by using shutil instead of the dbutils. This worked for copying many local files to Azure Storage in paralell. However, the issue I'm having now is finding a Unity Catalog friendly solution as mounting Azure Storage isn't recommended. (shutil and os won't work with abfss:// paths)

Etyr
Contributor

If you have spark session, you can use Spark hidden File System:

# Get FileSystem from SparkSession
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Get Path class to convert string path to FS path
path = spark._jvm.org.apache.hadoop.fs.Path
 
# List files
fs.listStatus(path("/path/to/data")) # Should work with mounted points
# Rename file
fs.rename(path("OriginalName"), path("NewName"))
# Delete file
fs.delete(path("/path/to/data"))
# Upload file to DBFS root
fs.copyFromLocalFile(path(local_file_path), path(remote_file_path))
# Upload file to DBFS root
fs.copyToLocalFile(path(remote_file_path), path(local_file_path))

 If you have an Azure Storage, you should mount it to you cluster and then you can access it with either `abfss://` or `/mnt/`

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.