cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How to fetch spark.addFiles when used multi node cluster

Mahesh_rathi__
Visitor

 

I wanted to share the nearly 12 xml files from dbfs location to executor local path by using sc.addFile and I went to your blog and tweaked my code to form path with file:/// the result of it was it worked when we have only one node but throwed error when multiple nodes are used in cluster although I m using sparkfiles.get to fetch the path

 

code:
from pyspark import SparkFiles
patten_pool_path = "dbfs:/FileStore/Mrathi/pattern-pool"
sc.addFile(patten_pool_path, recursive=True)
patten_pool_path = SparkFiles.get("pattern-pool")
full_path = "file://" + patten_pool_path + "/"
print(full_path) #output == file:///local_disk0/spark-7f135ab4-231f-4649-8571-f375f8ac738f/userFiles-de1552f4-8e94-4625-a2ca-e2โ€ฆ
rdd = sc.textFile(full_path)
head_rdd = rdd.pipe("head -n 5")
print(head_rdd.collect())

output :
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 16.0 failed 4 times, most recent failure: Lost task 5.3 in stage 16.0 (TID 278) (10.139.64.102 executor 0): java.io.FileNotFoundException: File file:/local_disk0/spark-7f135ab4-231f-4649-8571-f375f8ac738f/userFiles-de1552f4-8e94-4625-a2ca-e21ad2467b63/pattern-pool/Imaging_Measurement_Started_SubPattern_V1.xml does not exist

4 REPLIES 4

K_Anudeep
Databricks Employee
Databricks Employee

Hello @Mahesh_rathi__ ,

SparkContext.addFile is for shipping small side files to executors, not for creating an input path that you can pass to sc.textFile("file://...").

On a single-node cluster the driver and executor share the same machine, so the driverโ€™s local path โ€œhappens to work.โ€ In a multi-node cluster each executor has its own userFiles-<uuid> directory, so the driver-computed file:///local_disk0/... path wonโ€™t exist on the other nodesโ€”hence the FileNotFoundException

 

You can skip addFile and the file:// scheme entirely. Read from DBFS directly, and Spark will parallelise it across executors. Does this not work for you?

 

Currently in job for every batch  it refer these xml files from dbfs and this makes job bit slow so rather than reading it from dbfs I want to read it in executor memory 

Hi @Mahesh_rathi__ ,

If you want to read it in executor memory, can you broadcast the paths and the read?

@Mahesh_rathi__ ,

Sample code which might help you:

from pyspark import SparkFiles

xml_dir = "dbfs:/FileStore/Mrathi/pattern-pool"

files = [f for f in dbutils.fs.ls(xml_dir) if f.name.endswith(".xml")]
for file in files:
    sc.addFile(file.path)

file_names = [f.name for f in files]
files_bc = sc.broadcast(file_names)

def read_local_files(_):
    # This runs on executors. SparkFiles.get resolves the executor-local path.
    from pyspark import SparkFiles
    for name in files_bc.value:
        local_path = SparkFiles.get(name) 
        print(local_path)
        with open(local_path, "r") as fh:
            for line in fh:
                yield line

rdd = sc.parallelize([0], sc.defaultParallelism).flatMap(read_local_files)
print(rdd.take(5))

 

Let me know if it works

 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now