05-26-2016 11:27 AM
I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with:
>>> data = list(range(int(10*1e6)))
>>> import cPickle as pickle
>>> len(pickle.dumps(data))
98888896
Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session:
IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g
In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test:
data = list(range(int(10*1e6)))
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist()
print('count: {}'.format(joined_rdd.count()))
The stack trace:
TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main
process()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func
return func(split, prev_func(split, iterator))
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func
return f(iterator)
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <lambda>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in <genexpr>
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream yield self._read_with_length(stream) File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length
return self.loads(obj)
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads
return pickle.loads(obj)
MemoryError
at org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:138)
at org.apache.spark.api.python.PythonRDD$anon$1.<init>(PythonRDD.scala:179)
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at org.apache.spark.scheduler.Task.run(Task.scala:88)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745) 16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job --------------------------------------------------------------------------- Py4JJavaError Traceback (most recent call last)
<ipython-input-1-7a262fdfa561> in <module>()
7 joined_rdd.persist()
8 print('persist called')
----> 9 print('count: {}'.format(joined_rdd.count()))
/usr/lib/spark/python/pyspark/rdd.py in count(self)
1004 3
1005 """
-> 1006 return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()
1007
1008 def stats(self):
/usr/lib/spark/python/pyspark/rdd.py in sum(self)
995 6.0
996 """
--> 997 return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)
998
999 def count(self):
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)
869 # zeroValue provided to each partition is unique from the one provided
870 # to the final reduce call
--> 871 vals = self.mapPartitions(func).collect()
872 return reduce(op, vals, zeroValue)
873
/usr/lib/spark/python/pyspark/rdd.py in collect(self)
771 """
772 with SCCallSiteSync(self.context) as css:
--> 773 port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())
774 return list(_load_from_socket(port, self._jrdd_deserializer))
775 /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:259)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to
.value
to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory).
What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?
Thank you! Your help is appreciated.
Note, I've also posted this question to stackoverflow.
05-27-2016 11:36 AM
Does this answer your question: http://stackoverflow.com/a/37489738/1560062?
12-19-2017 12:05 PM
Please help.....
We definitely need a best practice for this scenario, as we need to load weight and bias from huge numpy files (500M, or 5G), and distribute them to each worker for the inference.
FYI:
1. load huge file in worker: each worker can load those file however there will be lots of exceptions, during the test, it will be 20% iteration will have such kind of exception.
2. load huge file in driver: Spark can't broadcast '_io.FileIO' variables
========1. Codes======== from pcml.models.multiTrialClassifier import MultiTrialClassifier try: model = MultiTrialClassifier() model.setup_in_tf() except: import tensorflow as tf tf.reset_default_graph() model = MultiTrialClassifier() model.setup_in_tf()
tdeNeuralClassifierModelBV = sc.broadcast(model)
========2. Error========= configApiStack: qa loading {'tokenizing': 'default', 'scale': 0.8756789083034756, 'tokenlist_ref': 'attrs_labels’} loading {'tokenizing': 'default', 'scale': 2.618619150801559, 'tokenlist_ref': 'sch_labels’} Traceback (most recent call last): File "/databricks/spark/python/pyspark/broadcast.py", line 83, indump pickle.dump(value, f, 2) TypeError: cannot serialize '_io.FileIO' object
PicklingError: Could not serialize broadcast: TypeError: cannot serialize 'io.FileIO' object =======3. Error details====== PicklingError Traceback (most recent call last) <command-1376727837833372> in <module>() 39 model.setup_in_tf() 40---> 41tdeNeuralClassifierModelBV = sc.broadcast(model)/databricks/spark/python/pyspark/context.py in broadcast(self, value) 797 be sent to each cluster only once. 798 """ --> 799return Broadcast(self, value, self.pickled_broadcast_vars) 800 801def accumulator(self, value, accum_param=None):/databricks/spark/python/pyspark/broadcast.py in init(self, sc, value, pickle_registry, path) 72if sc isnotNone: 73 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)---> 74self._path = self.dump(value, f) 75 self._jbroadcast = sc.jvm.PythonRDD.readBroadcastFromFile(sc.jsc, self.path) 76 self.pickle_registry = pickle_registry/databricks/spark/python/pyspark/broadcast.py in dump(self, value, f) 88%(e.class.name, _exception_message(e)) 89 print_exec(sys.stderr)---> 90raise pickle.PicklingError(msg) 91 f.close() 92return f.name
11-06-2020 06:34 AM
Good job slot my team boy :DDD
11-06-2020 07:24 AM
Good like เกมสล็อต my friend ;DDD
11-06-2020 08:18 PM
The Facebook credit can be utilized by the gamers to purchase the pearls. The other route is to finished various sorts of Dragons in the Dragon Book. Dragon City Gems There are various kinds of Dragons, one is amazing, at that point you have the fundamental mythical beast, Earth Dragon, etc. Finishing the unbelievable mythical serpent, the gamer will procure 5 diamonds while on culmination of the fundamental monster you get 2 pearls.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group