cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Tips for properly using large broadcast variables?

nthomas
New Contributor

I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:

>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle     
>>> len(pickle.dumps(data))   
98888896

Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:

IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g

In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:

data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count()))

The stack trace:

TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):     org.apache.spark.api.python.PythonException: Traceback (most recent call last):    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main        
process()    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process        serializer.dump_stream(func(split_index, iterator), outfile)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))     
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))      
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func        
return f(iterator)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream        yield self._read_with_length(stream)      File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length        
return self.loads(obj)      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads       
return pickle.loads(obj)    
MemoryError      
at org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:138)      
at org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:179)     
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)      
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)      
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)      
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)      
at java.lang.Thread.run(Thread.java:745)    16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job    ---------------------------------------------------------------------------    Py4JJavaError                             Traceback (most recent call last)    
<ipython-input-1-7a262fdfa561> in <module>()         
            7 joined_rdd.persist()          
            8 print('persist called')    
            ----> 9 print('count: {}'.format(joined_rdd.count()))    
/usr/lib/spark/python/pyspark/rdd.py in count(self)       
1004         3       
1005         """    
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()       
1007       
1008     def stats(self):    
/usr/lib/spark/python/pyspark/rdd.py in sum(self)        
995         6.0        
996         """    
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)        
998        
999     def count(self):    
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)        
869         # zeroValue provided to each partition is unique from the one provided        
870         # to the final reduce call    
--> 871         vals = self.mapPartitions(func).collect()        
872         return reduce(op, vals, zeroValue)        
873    
/usr/lib/spark/python/pyspark/rdd.py in collect(self)        
771         """        
772         with SCCallSiteSync(self.context) as css:    
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())       
 774         return list(_load_from_socket(port, self._jrdd_deserializer))        
775    /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)      
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)      
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)      
at py4j.Gateway.invoke(Gateway.java:259)      
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)      
at py4j.commands.CallCommand.execute(CallCommand.java:79)      
at py4j.GatewayConnection.run(GatewayConnection.java:207)      
at java.lang.Thread.run(Thread.java:745)

I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to

.value

to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).

What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?

Thank you! Your help is appreciated.

Note, I've also posted this question to stackoverflow.

5 REPLIES 5

_throwaway
New Contributor II

Does this answer your question: http://stackoverflow.com/a/37489738/1560062?

nbajason
New Contributor II

Please help.....

We definitely need a best practice for this scenario, as we need to load weight and bias from huge numpy files (500M, or 5G), and distribute them to each worker for the inference.

FYI:

1. load huge file in worker: each worker can load those file however there will be lots of exceptions, during the test, it will be 20% iteration will have such kind of exception.

2. load huge file in driver: Spark can't broadcast '_io.FileIO' variables

========1. Codes======== from pcml.models.multiTrialClassifier import MultiTrialClassifier try: model = MultiTrialClassifier() model.setup_in_tf() except: import tensorflow as tf tf.reset_default_graph() model = MultiTrialClassifier() model.setup_in_tf()

tdeNeuralClassifierModelBV = sc.broadcast(model)

========2. Error========= configApiStack: qa loading {'tokenizing': 'default', 'scale': 0.8756789083034756, 'tokenlist_ref': 'attrs_labels’} loading {'tokenizing': 'default', 'scale': 2.618619150801559, 'tokenlist_ref': 'sch_labels’} Traceback (most recent call last): File "/databricks/spark/python/pyspark/broadcast.py", line 83, indump pickle.dump(value, f, 2) TypeError: cannot serialize '_io.FileIO' object

PicklingError: Could not serialize broadcast: TypeError: cannot serialize 'io.FileIO' object =======3. Error details====== PicklingError Traceback (most recent call last) <command-1376727837833372> in <module>() 39 model.setup_in_tf() 40---> 41tdeNeuralClassifierModelBV = sc.broadcast(model)/databricks/spark/python/pyspark/context.py in broadcast(self, value) 797 be sent to each cluster only once. 798 """ --> 799return Broadcast(self, value, self.pickled_broadcast_vars) 800 801def accumulator(self, value, accum_param=None):/databricks/spark/python/pyspark/broadcast.py in init(self, sc, value, pickle_registry, path) 72if sc isnotNone: 73 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)---> 74self._path = self.dump(value, f) 75 self._jbroadcast = sc.jvm.PythonRDD.readBroadcastFromFile(sc.jsc, self.path) 76 self.pickle_registry = pickle_registry/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f) 88%(e.class.name, _exception_message(e)) 89 print_exec(sys.stderr)---> 90raise pickle.PicklingError(msg) 91 f.close() 92return f.name

dewyyydewttt
New Contributor II

Good job slot my team boy :DDD

dewhhhdewggg
New Contributor II

Good like เกมสล็อต my friend ;DDD

dragoncity
New Contributor II

The Facebook credit can be utilized by the gamers to purchase the pearls. The other route is to finished various sorts of Dragons in the Dragon Book. Dragon City Gems There are various kinds of Dragons, one is amazing, at that point you have the fundamental mythical beast, Earth Dragon, etc. Finishing the unbelievable mythical serpent, the gamer will procure 5 diamonds while on culmination of the fundamental monster you get 2 pearls.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group