<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Tips for properly using large broadcast variables? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29707#M21418</link>
    <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;Good like &lt;A href="http://www.slot-888.co" target="_blank"&gt;เกมสล็อต&lt;/A&gt; my friend ;DDD&lt;/P&gt; &lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Fri, 06 Nov 2020 15:24:17 GMT</pubDate>
    <dc:creator>dewhhhdewggg</dc:creator>
    <dc:date>2020-11-06T15:24:17Z</dc:date>
    <item>
      <title>Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29703#M21414</link>
      <description>&lt;P&gt;I'm using a broadcast variable about 100 MB pickled in size, which I'm approximating with: &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;&amp;gt;&amp;gt;&amp;gt; data = list(range(int(10*1e6)))
&amp;gt;&amp;gt;&amp;gt; import cPickle as pickle     
&amp;gt;&amp;gt;&amp;gt; len(pickle.dumps(data))   
98888896&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Running on a cluster with 3 c3.2xlarge executors, and a m3.large driver, with the following command launching the interactive session: &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;IPYTHON=1 pyspark --executor-memory 10G --driver-memory 5G --conf spark.driver.maxResultSize=5g&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;In an RDD, if I persist a reference to this broadcast variable, the memory usage explodes. For 100 references to a 100 MB variable, even if it were copied 100 times, I'd expect the data usage to be no more than 10 GB total (let alone 30 GB over 3 nodes). However, I see out of memory errors when I run the following test: &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;data = list(range(int(10*1e6))) 
metadata = sc.broadcast(data)
ids = sc.parallelize(zip(range(100), range(100)))
joined_rdd = ids.mapValues(lambda _: metadata.value)
joined_rdd.persist() 
print('count: {}'.format(joined_rdd.count()))&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;The stack trace: &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;TaskSetManager: Lost task 17.3 in stage 0.0 (TID 75, 10.22.10.13):     org.apache.spark.api.python.PythonException: Traceback (most recent call last):    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 111, in main        
process()    
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/worker.py", line 106, in process        serializer.dump_stream(func(split_index, iterator), outfile)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))     
File "/usr/lib/spark/python/pyspark/rdd.py", line 2355, in pipeline_func        
return func(split, prev_func(split, iterator))      
File "/usr/lib/spark/python/pyspark/rdd.py", line 317, in func        
return f(iterator)      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in &amp;lt;lambda&amp;gt;        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/pyspark/rdd.py", line 1006, in &amp;lt;genexpr&amp;gt;        
return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 139, in load_stream        yield self._read_with_length(stream)      File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 164, in _read_with_length        
return self.loads(obj)      
File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 422, in loads       
return pickle.loads(obj)    
MemoryError      
at org.apache.spark.api.python.PythonRDD$anon$1.read(PythonRDD.scala:138)      
at org.apache.spark.api.python.PythonRDD$anon$1.&amp;lt;init&amp;gt;(PythonRDD.scala:179)     
at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:97)      
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:297)      
at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)      
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at org.apache.spark.scheduler.Task.run(Task.scala:88)      
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)      
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)      
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)      
at java.lang.Thread.run(Thread.java:745)    16/05/25 23:57:15 ERROR TaskSetManager: Task 17 in stage 0.0 failed 4 times; aborting job    ---------------------------------------------------------------------------    Py4JJavaError                             Traceback (most recent call last)    
&amp;lt;ipython-input-1-7a262fdfa561&amp;gt; in &amp;lt;module&amp;gt;()         
            7 joined_rdd.persist()          
            8 print('persist called')    
            ----&amp;gt; 9 print('count: {}'.format(joined_rdd.count()))    
/usr/lib/spark/python/pyspark/rdd.py in count(self)       
1004         3       
1005         """    
-&amp;gt; 1006         return self.mapPartitions(lambda i: [sum(1 for _ in i)]).sum()       
1007       
1008     def stats(self):    
/usr/lib/spark/python/pyspark/rdd.py in sum(self)        
995         6.0        
996         """    
--&amp;gt; 997         return self.mapPartitions(lambda x: [sum(x)]).fold(0, operator.add)        
998        
999     def count(self):    
/usr/lib/spark/python/pyspark/rdd.py in fold(self, zeroValue, op)        
869         # zeroValue provided to each partition is unique from the one provided        
870         # to the final reduce call    
--&amp;gt; 871         vals = self.mapPartitions(func).collect()        
872         return reduce(op, vals, zeroValue)        
873    
/usr/lib/spark/python/pyspark/rdd.py in collect(self)        
771         """        
772         with SCCallSiteSync(self.context) as css:    
--&amp;gt; 773             port = self.ctx._jvm.PythonRDD.collectAndServe(self._jrdd.rdd())       
 774         return list(_load_from_socket(port, self._jrdd_deserializer))        
775    /usr/lib/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py in __call__(self, *args)      
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)      
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)      
at py4j.Gateway.invoke(Gateway.java:259)      
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)      
at py4j.commands.CallCommand.execute(CallCommand.java:79)      
at py4j.GatewayConnection.run(GatewayConnection.java:207)      
at java.lang.Thread.run(Thread.java:745)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;I've seen previous threads about the memory usage of pickle deserialization being an issue. However, I would expect a broadcast variable to only be deserialized (and loaded into memory on an executor) once, and subsequent references to &lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;.value&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt; to reference that in-memory address. That doesn't seem to be the case, however. Am I missing something?The examples I've seen with broadcast variables have them as dictionaries, used one time to transform a set of data (i.e. replace airport acronyms with airport names). The motivation behind persisting them here is to create objects with knowledge of a broadcast variable and how to interact with it, persist those objects, and perform multiple computations using them (with spark taking care of holding them in memory). &lt;/P&gt;&lt;P&gt;What are some tips for using large (100 MB+) broadcast variables? Is persisting a broadcast variable misguided? Is this an issue that is possibly specific to PySpark?&lt;/P&gt;&lt;P&gt;Thank you! Your help is appreciated.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;Note, I've also posted this question to &lt;A href="https://stackoverflow.com" alt="https://stackoverflow.com" target="_blank"&gt;stackoverflow&lt;/A&gt;. &lt;/P&gt;</description>
      <pubDate>Thu, 26 May 2016 18:27:38 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29703#M21414</guid>
      <dc:creator>nthomas</dc:creator>
      <dc:date>2016-05-26T18:27:38Z</dc:date>
    </item>
    <item>
      <title>Re: Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29704#M21415</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Does this answer your question: &lt;A href="http://stackoverflow.com/a/37489738/1560062" target="test_blank"&gt;http://stackoverflow.com/a/37489738/1560062&lt;/A&gt;? &lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 27 May 2016 18:36:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29704#M21415</guid>
      <dc:creator>_throwaway</dc:creator>
      <dc:date>2016-05-27T18:36:54Z</dc:date>
    </item>
    <item>
      <title>Re: Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29705#M21416</link>
      <description>&lt;P&gt;Please help.....&lt;/P&gt;&lt;P&gt;We definitely need a best practice for this scenario, as we need to load weight and bias from huge numpy files (500M, or 5G), and distribute them to each worker for the inference.&lt;/P&gt;&lt;P&gt;FYI:&lt;/P&gt;&lt;P&gt;1. load huge file in worker: each worker can load those file however there will be lots of exceptions, during the test, it will be 20% iteration will have such kind of exception.&lt;/P&gt;&lt;P&gt;2. load huge file in driver: Spark can't broadcast '_io.FileIO' variables&lt;/P&gt;&lt;P&gt;========1. Codes======== from pcml.models.multiTrialClassifier import MultiTrialClassifier try: model = MultiTrialClassifier() model.setup_in_tf() except: import tensorflow as tf tf.reset_default_graph() model = MultiTrialClassifier() model.setup_in_tf()&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;tdeNeuralClassifierModelBV = sc.broadcast(model)&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;========2. Error========= configApiStack: qa loading {'tokenizing': 'default', 'scale': 0.8756789083034756, 'tokenlist_ref': 'attrs_labels’} loading {'tokenizing': 'default', 'scale': 2.618619150801559, 'tokenlist_ref': 'sch_labels’} Traceback (most recent call last): File "/databricks/spark/python/pyspark/&lt;A href="http://broadcast.py" alt="http://broadcast.py" target="_blank"&gt;broadcast.py&lt;/A&gt;", line 83, in&lt;B&gt;dump pickle.dump(value, f, 2) &lt;/B&gt; &lt;B&gt;TypeError: cannot serialize '_io.FileIO' object &lt;/B&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;PicklingError: Could not serialize broadcast: TypeError: cannot serialize '&lt;I&gt;io.FileIO' object =======3. Error details====== PicklingError Traceback (most recent call last) &amp;lt;command-1376727837833372&amp;gt; in &amp;lt;module&amp;gt;() 39 model.setup_in_tf() 40---&amp;gt; 41tdeNeuralClassifierModelBV = sc.broadcast(model)/databricks/spark/python/pyspark/&lt;/I&gt;&lt;A href="http://context.py" alt="http://context.py" target="_blank"&gt;&lt;I&gt;context.py&lt;/I&gt;&lt;/A&gt;&lt;I&gt; in broadcast(self, value) 797 be sent to each cluster only once. 798 """ --&amp;gt; 799return Broadcast(self, value, self.pickled_broadcast_vars) 800 801def accumulator(self, value, accum_param=None):/databricks/spark/python/pyspark/&lt;/I&gt;&lt;A href="http://broadcast.py" alt="http://broadcast.py" target="_blank"&gt;&lt;I&gt;broadcast.py&lt;/I&gt;&lt;/A&gt;&lt;I&gt; in init&lt;/I&gt;(self, sc, value, pickle_registry, path) 72if sc isnotNone: 73 f = NamedTemporaryFile(delete=False, dir=sc._temp_dir)---&amp;gt; 74self._path = self.dump(value, f) 75 self._jbroadcast = sc.&lt;I&gt;jvm.PythonRDD.readBroadcastFromFile(sc.jsc, self.path) 76 self.pickle_registry = pickle_registry/databricks/spark/python/pyspark/&lt;/I&gt;&lt;A href="http://broadcast.py" alt="http://broadcast.py" target="_blank"&gt;&lt;I&gt;broadcast.py&lt;/I&gt;&lt;/A&gt;&lt;I&gt; in dump(self, value, f) 88%(&lt;/I&gt;&lt;A href="http://e.class.name" alt="http://e.class.name" target="_blank"&gt;&lt;I&gt;e.class.name&lt;/I&gt;&lt;/A&gt;, _exception_message(e)) 89 print_exec(sys.stderr)---&amp;gt; 90raise pickle.PicklingError(msg) 91 f.close() 92return f.name&lt;/P&gt;&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 19 Dec 2017 20:05:10 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29705#M21416</guid>
      <dc:creator>nbajason</dc:creator>
      <dc:date>2017-12-19T20:05:10Z</dc:date>
    </item>
    <item>
      <title>Re: Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29706#M21417</link>
      <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;Good job &lt;A href="https://www.slot-888.co/" target="_blank"&gt;slot&lt;/A&gt; my team boy :DDD&lt;/P&gt; &lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 06 Nov 2020 14:34:55 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29706#M21417</guid>
      <dc:creator>dewyyydewttt</dc:creator>
      <dc:date>2020-11-06T14:34:55Z</dc:date>
    </item>
    <item>
      <title>Re: Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29707#M21418</link>
      <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;Good like &lt;A href="http://www.slot-888.co" target="_blank"&gt;เกมสล็อต&lt;/A&gt; my friend ;DDD&lt;/P&gt; &lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 06 Nov 2020 15:24:17 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29707#M21418</guid>
      <dc:creator>dewhhhdewggg</dc:creator>
      <dc:date>2020-11-06T15:24:17Z</dc:date>
    </item>
    <item>
      <title>Re: Tips for properly using large broadcast variables?</title>
      <link>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29708#M21419</link>
      <description>&lt;P&gt;The Facebook credit can be utilized by the gamers to purchase the pearls. The other route is to finished various sorts of Dragons in the Dragon Book.&lt;B&gt; Dragon City Gems&lt;/B&gt; There are various kinds of Dragons, one is amazing, at that point you have the fundamental mythical beast, Earth Dragon, etc. Finishing the unbelievable mythical serpent, the gamer will procure 5 diamonds while on culmination of the fundamental monster you get 2 pearls.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 07 Nov 2020 04:18:49 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/tips-for-properly-using-large-broadcast-variables/m-p/29708#M21419</guid>
      <dc:creator>dragoncity</dc:creator>
      <dc:date>2020-11-07T04:18:49Z</dc:date>
    </item>
  </channel>
</rss>

