Pyspark: You cannot use dbutils within a spark job
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 12:19 AM
I am trying to parallelise the execution of file copy in Databricks. Making use of multiple executors is one way. So, this is the piece of code that I wrote in pyspark.
def parallel_copy_execution(src_path: str, target_path: str):
files_in_path = dbutils.fs.ls(src_path)
file_paths_df = spark.sparkContext.parallelize(files_in_path).toDF()
file_paths_df.foreach(lambda x: dbutils.fs.cp(x.path.toString(), target_path, recurse=True))
I fetched all the files to copy and created a Dataframe. And when trying to run a foreach on top of the DataFrame I am getting the following error. It says that
`You cannot use dbutils within a spark job`
You cannot use dbutils within a spark job or otherwise pickle it.
If you need to use getArguments within a spark job, you have to get the argument before
using it in the job. For example, if you have the following code:
myRdd.map(lambda i: dbutils.args.getArgument("X") + str(i))
Then you should use it this way:
argX = dbutils.args.getArgument("X")
myRdd.map(lambda i: argX + str(i))
But when I try the same in Scala. It works perfectly. The dbutils is used inside a spark job then. Attaching that piece of code as well.
def parallel_copy_execution(p: String, t: String): Unit = {
dbutils.fs.ls(p).map(_.path).toDF.foreach { file =>
dbutils.fs.cp(file(0).toString,t , recurse=true)
println(s"cp file: $file")
}
}
Is the Pyspark API's not updated to handle this?
If yes, please suggest an alternative to process parallel the dbutils command.
- Labels:
-
Parallel processing
-
Pyspark
-
Python
-
Spark job
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 12:30 AM
I think Pyspark API does not support it now.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 12:59 AM
Thanks! Is there any other alternative of parallel processing ?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-16-2023 11:26 PM
@Nandini Raja Were you able to find a solution for this? We are trying to bulk copy files from s3 to ADLS Gen2 and dbutils being single threaded is a pain. I even tried the scala code that worked for you but I get the below error:
Caused by: KeyProviderException: Failure to initialize configuration
Caused by: InvalidConfigurationValueException: Invalid configuration value detected for fs.azure.account.key
This may be because this conf is not available at executor level.
Any solution you were able to figure out?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 01:38 AM
May be we can try FileUtil.copy
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 04:27 AM
Could you use delta clone to copy tables?
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 08:00 AM
Yes, I think the best will be to rebuild the code entirely and use, for example, COPY INTO.
- dbutils utilize just one core
- rdd, is not optimized by the catalyst, and AQE
- high-level code like COPY INTO is executed distributed way and is optimized
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 08:01 AM
Alternatively, copy files using Azure Data Factory. It has great throughput.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 09:21 PM
You can't use dbutils command inside pyspark API. Try using s3 copy or equivalent in Azure.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-10-2023 12:14 PM
@Nandini Raja I did something similar by using shutil instead of the dbutils. This worked for copying many local files to Azure Storage in paralell. However, the issue I'm having now is finding a Unity Catalog friendly solution as mounting Azure Storage isn't recommended. (shutil and os won't work with abfss:// paths)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-11-2023 02:33 AM
If you have spark session, you can use Spark hidden File System:
# Get FileSystem from SparkSession
fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(spark._jsc.hadoopConfiguration())
# Get Path class to convert string path to FS path
path = spark._jvm.org.apache.hadoop.fs.Path
# List files
fs.listStatus(path("/path/to/data")) # Should work with mounted points
# Rename file
fs.rename(path("OriginalName"), path("NewName"))
# Delete file
fs.delete(path("/path/to/data"))
# Upload file to DBFS root
fs.copyFromLocalFile(path(local_file_path), path(remote_file_path))
# Upload file to DBFS root
fs.copyToLocalFile(path(remote_file_path), path(local_file_path))
If you have an Azure Storage, you should mount it to you cluster and then you can access it with either `abfss://` or `/mnt/`