cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Pyspark: You cannot use dbutils within a spark job

Nandini
New Contributor II

I am trying to parallelise the execution of file copy in Databricks. Making use of multiple executors is one way. So, this is the piece of code that I wrote in pyspark.

def parallel_copy_execution(src_path: str, target_path: str):
  files_in_path = dbutils.fs.ls(src_path)
  file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
  file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))

I fetched all the files to copy and created a Dataframe. And when trying to run a foreach on top of the DataFrame I am getting the following error. It says that 

`You cannot use dbutils within a spark job`

You cannot use dbutils within a spark job or otherwise pickle it.
            If you need to use getArguments within a spark job, you have to get the argument before
            using it in the job. For example, if you have the following code:
 
              myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
 
            Then you should use it this way:
 
              argX = dbutils.args.getArgument("X")
              myRdd.map(lambda i: argX + str(i))

But when I try the same in Scala. It works perfectly. The dbutils is used inside a spark job then. Attaching that piece of code as well.

def parallel_copy_execution(p: String, t: String): Unit = {
  dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
    dbutils.fs.cp(file(0).toString,t , recurse=true)
    println(s"cp file: $file")
  }
}

Is the Pyspark API's not updated to handle this?

If yes, please suggest an alternative to process parallel the dbutils command.

10 REPLIES 10

Ajay-Pandey
Esteemed Contributor III

I think Pyspark API does not support it now.

Ajay Kumar Pandey

Nandini
New Contributor II

Thanks! Is there any other alternative of parallel processing ?

pulkitm
New Contributor III

@Nandini Rajaโ€‹ Were you able to find a solution for this? We are trying to bulk copy files from s3 to ADLS Gen2 and dbutils being single threaded is a pain. I even tried the scala code that worked for you but I get the below error:

Caused by: KeyProviderException: Failure to initialize configuration

Caused by: InvalidConfigurationValueException: Invalid configuration value detected for fs.azure.account.key

This may be because this conf is not available at executor level.

Any solution you were able to figure out?

KVNARK
Honored Contributor II

May be we can try FileUtil.copy

Anonymous
Not applicable

Could you use delta clone to copy tables?

Hubert-Dudek
Esteemed Contributor III

Yes, I think the best will be to rebuild the code entirely and use, for example, COPY INTO.

  • dbutils utilize just one core
  • rdd, is not optimized by the catalyst, and AQE
  • high-level code like COPY INTO is executed distributed way and is optimized

Hubert-Dudek
Esteemed Contributor III

Alternatively, copy files using Azure Data Factory. It has great throughput.

VaibB
Contributor

You can't use dbutils command inside pyspark API. Try using s3 copy or equivalent in Azure.

Matt101122
Contributor

@Nandini Rajaโ€‹ I did something similar by using shutil instead of the dbutils. This worked for copying many local files to Azure Storage in paralell. However, the issue I'm having now is finding a Unity Catalog friendly solution as mounting Azure Storage isn't recommended. (shutil and os won't work with abfss:// paths)

Etyr
Contributor

If you have spark session, you can use Spark hidden File System:

# Get FileSystem from SparkSession
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Get Path class to convert string path to FS path
path = spark._jvm.org.apache.hadoop.fs.Path
 
# List files
fs.listStatus(path("/path/to/data")) # Should work with mounted points
# Rename file
fs.rename(path("OriginalName"), path("NewName"))
# Delete file
fs.delete(path("/path/to/data"))
# Upload file to DBFS root
fs.copyFromLocalFile(path(local_file_path), path(remote_file_path))
# Upload file to DBFS root
fs.copyToLocalFile(path(remote_file_path), path(local_file_path))

 If you have an Azure Storage, you should mount it to you cluster and then you can access it with either `abfss://` or `/mnt/`

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group