05-26-2016 11:27 AM
I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle     
>>> len(pickle.dumps(data))   
98888896Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5gIn an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:
data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count()))The stack trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):     org.apache.spark.api.python.PythonException: Traceback (most recent call last):    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main        
process()    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process        serializer.dump_stream(func(split_index, iterator), outfile)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))     
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))      
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func        
return f(iterator)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream        yield self._read_with_length(stream)      File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length        
return self.loads(obj)      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads       
return pickle.loads(obj)    
MemoryError      
at org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:138)      
at org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:179)     
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)      
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)      
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)      
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)      
at java.lang.Thread.run(Thread.java:745)    16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job    ---------------------------------------------------------------------------    Py4JJavaError                             Traceback (most recent call last)    
<ipython-input-1-7a262fdfa561> in <module>()         
            7 joined_rdd.persist()          
            8 print('persist called')    
            ----> 9 print('count: {}'.format(joined_rdd.count()))    
/usr/lib/spark/python/pyspark/rdd.py in count(self)       
1004         3       
1005         """    
-> 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()       
1007       
1008     def stats(self):    
/usr/lib/spark/python/pyspark/rdd.py in sum(self)        
995         6.0        
996         """    
--> 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)        
998        
999     def count(self):    
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)        
869         # zeroValue provided to each partition is unique from the one provided        
870         # to the final reduce call    
--> 871         vals = self.mapPartitions(func).collect()        
872         return reduce(op, vals, zeroValue)        
873    
/usr/lib/spark/python/pyspark/rdd.py in collect(self)        
771         """        
772         with SCCallSiteSync(self.context) as css:    
--> 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())       
 774         return list(_load_from_socket(port, self._jrdd_deserializer))        
775    /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)      
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)      
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)      
at py4j.Gateway.invoke(Gateway.java:259)      
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)      
at py4j.commands.CallCommand.execute(CallCommand.java:79)      
at py4j.GatewayConnection.run(GatewayConnection.java:207)      
at java.lang.Thread.run(Thread.java:745)I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to
.valueto reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).
What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?
Thank you! Your help is appreciated.
Note, I've also posted this question to stackoverflow.
05-27-2016 11:36 AM
Does this answer your question: http://stackoverflow.com/a/37489738/1560062?
12-19-2017 12:05 PM
Please help.....
We definitely need a best practice for this scenario, as we need to load weight and bias from huge numpy files (500M, or 5G), and distribute them to each worker for the inference.
FYI:
1. load huge file in worker: each worker can load those file however there will be lots of exceptions, during the test, it will be 20% iteration will have such kind of exception.
2. load huge file in driver: Spark can't broadcast '_io.FileIO' variables
========1. Codes======== from pcml.models.multiTrialClassifier import MultiTrialClassifier try: model = MultiTrialClassifier() model.setup_in_tf() except: import tensorflow as tf tf.reset_default_graph() model = MultiTrialClassifier() model.setup_in_tf()
tdeNeuralClassifierModelBV = sc.broadcast(model)
========2. Error========= configApiStack: qa loading {'tokenizing': 'default', 'scale': 0.8756789083034756, 'tokenlist_ref': 'attrs_labels’} loading {'tokenizing': 'default', 'scale': 2.618619150801559, 'tokenlist_ref': 'sch_labels’} Traceback (most recent call last): File "/databricks/spark/python/pyspark/broadcast.py", line 83, indump pickle.dump(value, f, 2) TypeError: cannot serialize '_io.FileIO' object
PicklingError: Could not serialize broadcast: TypeError: cannot serialize 'io.FileIO' object =======3. Error details====== PicklingError Traceback (most recent call last) <command-1376727837833372> in <module>() 39 model.setup_in_tf() 40---> 41tdeNeuralClassifierModelBV = sc.broadcast(model)/databricks/spark/python/pyspark/context.py in broadcast(self, value) 797 be sent to each cluster only once. 798 """ --> 799return Broadcast(self, value, self.pickled_broadcast_vars) 800 801def accumulator(self, value, accum_param=None):/databricks/spark/python/pyspark/broadcast.py in init(self, sc, value, pickle_registry, path) 72if sc isnotNone: 73 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)---> 74self._path = self.dump(value, f) 75 self._jbroadcast = sc.jvm.PythonRDD.readBroadcastFromFile(sc.jsc, self.path) 76 self.pickle_registry = pickle_registry/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f) 88%(e.class.name, _exception_message(e)) 89 print_exec(sys.stderr)---> 90raise pickle.PicklingError(msg) 91 f.close() 92return f.name
11-06-2020 06:34 AM
Good job slot my team boy :DDD
11-06-2020 07:24 AM
Good like เกมสล็อต my friend ;DDD
11-06-2020 08:18 PM
The Facebook credit can be utilized by the gamers to purchase the pearls. The other route is to finished various sorts of Dragons in the Dragon Book. Dragon City Gems There are various kinds of Dragons, one is amazing, at that point you have the fundamental mythical beast, Earth Dragon, etc. Finishing the unbelievable mythical serpent, the gamer will procure 5 diamonds while on culmination of the fundamental monster you get 2 pearls.
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now