12-05-2022 12:19 AM
I am trying to parallelise the execution of file copy in Databricks. Making use of multiple executors is one way. So, this is the piece of code that I wrote in pyspark.
def parallel_copy_execution(src_path: str, target_path: str):
files_in_path = dbutils.fs.ls(src_path)
file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))
I fetched all the files to copy and created a Dataframe. And when trying to run a foreach on top of the DataFrame I am getting the following error. It says that
`You cannot use dbutils within a spark job`
You cannot use dbutils within a spark job or otherwise pickle it.
If you need to use getArguments within a spark job, you have to get the argument before
using it in the job. For example, if you have the following code:
myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
Then you should use it this way:
argX = dbutils.args.getArgument("X")
myRdd.map(lambda i: argX + str(i))
But when I try the same in Scala. It works perfectly. The dbutils is used inside a spark job then. Attaching that piece of code as well.
def parallel_copy_execution(p: String, t: String): Unit = {
dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
dbutils.fs.cp(file(0).toString,t , recurse=true)
println(s"cp file: $file")
}
}
Is the Pyspark API's not updated to handle this?
If yes, please suggest an alternative to process parallel the dbutils command.
12-05-2022 12:30 AM
I think Pyspark API does not support it now.
12-05-2022 12:59 AM
Thanks! Is there any other alternative of parallel processing ?
04-16-2023 11:26 PM
@Nandini Raja Were you able to find a solution for this? We are trying to bulk copy files from s3 to ADLS Gen2 and dbutils being single threaded is a pain. I even tried the scala code that worked for you but I get the below error:
Caused by: KeyProviderException: Failure to initialize configuration
Caused by: InvalidConfigurationValueException: Invalid configuration value detected for fs.azure.account.key
This may be because this conf is not available at executor level.
Any solution you were able to figure out?
12-05-2022 01:38 AM
May be we can try FileUtil.copy
12-05-2022 04:27 AM
Could you use delta clone to copy tables?
12-05-2022 08:00 AM
Yes, I think the best will be to rebuild the code entirely and use, for example, COPY INTO.
12-05-2022 08:01 AM
Alternatively, copy files using Azure Data Factory. It has great throughput.
12-05-2022 09:21 PM
You can't use dbutils command inside pyspark API. Try using s3 copy or equivalent in Azure.
01-10-2023 12:14 PM
@Nandini Raja I did something similar by using shutil instead of the dbutils. This worked for copying many local files to Azure Storage in paralell. However, the issue I'm having now is finding a Unity Catalog friendly solution as mounting Azure Storage isn't recommended. (shutil and os won't work with abfss:// paths)
01-11-2023 02:33 AM
If you have spark session, you can use Spark hidden File System:
# Get FileSystem from SparkSession
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Get Path class to convert string path to FS path
path = spark._jvm.org.apache.hadoop.fs.Path
# List files
fs.listStatus(path("/path/to/data")) # Should work with mounted points
# Rename file
fs.rename(path("OriginalName"), path("NewName"))
# Delete file
fs.delete(path("/path/to/data"))
# Upload file to DBFS root
fs.copyFromLocalFile(path(local_file_path), path(remote_file_path))
# Upload file to DBFS root
fs.copyToLocalFile(path(remote_file_path), path(local_file_path))
If you have an Azure Storage, you should mount it to you cluster and then you can access it with either `abfss://` or `/mnt/`
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group