โ02-10-2016 10:07 AM
A cartesian product is a common operation to get the cross product of two tables.
For example, say you have a list of customers and a list of your product catalog and want to get the cross product of all customer - product combinations.
Cartesian products however can be a very expensive operation. Even with as little as 6,000 products and 100,000 customers the output will be 600 million records (6K x 100K = 600M)
โ02-14-2016 02:31 PM
To do this effectively in Spark try the following technique:
1. Cache the SMALLER of the two datasets (which in this case is the 6,000 item product catalog). When you cache you will need to repartition the data such that there is data on every worker so you can use as many tasks as possible. Say you that your cluster has 25 workers with 4 cores per worker. You therefore need 4 x 25 = 100 partitions.
val products = sqlContext.table("products").repartition(100).cache()
2. Remember to call an action on the products DataFrame so that the caching occurs before the job starts.
products.take(1)
Check the SparkUI -> Storage tab to make sure that the products DF cache has been distributed on all nodes.
3. Disable broadcast joins temporarily. Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes.
%sql set spark.sql.autoBroadcastJoinThreshold =0
4. Call join with the other table without using a join condition. You don't need to cache the larger table since it will take just as long to cache the bigger dataset as run the cross product.
val customers = table("customers")
val joined = customers.join(products)
5. Run an explain plan on the dataframe before executing to confirm you have a cartesian product operation.
joined.explain()==PhysicalPlan== CartesianProduct :-ConvertToSafe :+-ScanParquetRelation[customer_key#45642,name#45643] InputPaths: dbfs:/tmp/customers +-ConvertToSafe +-ScanParquetRelation[product_key#45644,name#45645] InputPaths: dbfs:/tmp/products
6. Lastly save your result.
joined.write.save(...path...)
Happy Sparking!
โ02-14-2016 02:31 PM
To do this effectively in Spark try the following technique:
1. Cache the SMALLER of the two datasets (which in this case is the 6,000 item product catalog). When you cache you will need to repartition the data such that there is data on every worker so you can use as many tasks as possible. Say you that your cluster has 25 workers with 4 cores per worker. You therefore need 4 x 25 = 100 partitions.
val products = sqlContext.table("products").repartition(100).cache()
2. Remember to call an action on the products DataFrame so that the caching occurs before the job starts.
products.take(1)
Check the SparkUI -> Storage tab to make sure that the products DF cache has been distributed on all nodes.
3. Disable broadcast joins temporarily. Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes.
%sql set spark.sql.autoBroadcastJoinThreshold =0
4. Call join with the other table without using a join condition. You don't need to cache the larger table since it will take just as long to cache the bigger dataset as run the cross product.
val customers = table("customers")
val joined = customers.join(products)
5. Run an explain plan on the dataframe before executing to confirm you have a cartesian product operation.
joined.explain()==PhysicalPlan== CartesianProduct :-ConvertToSafe :+-ScanParquetRelation[customer_key#45642,name#45643] InputPaths: dbfs:/tmp/customers +-ConvertToSafe +-ScanParquetRelation[product_key#45644,name#45645] InputPaths: dbfs:/tmp/products
6. Lastly save your result.
joined.write.save(...path...)
Happy Sparking!
โ05-02-2017 11:03 AM
Does this answer still represent best practices? I am curious about the use of join rather than cartesian. Why not
val joined = customers.cartesian(products)
?
โ05-02-2017 09:46 AM
I am curious to know if this is still the best recommendation for doing a large cartesian product in spark. For example is it better to use 'join' rather than 'cartesian' specifically? Why not 'val joined = customer.cartesian(products)'
โ02-24-2018 08:05 AM
I have also the same problem. I have a big RDD and I want to calculate the similarity between elements.When I take cartesian on this Big rdd, it causes a lot of shuffles. Is there any way around?Can we comapre the elemnets without using cartesian?
val cart=bigRDD.cartesian(bigRDD)
@Miklos, please help me reagrding this.
โ05-10-2018 02:12 AM
Hi buddies, it is great written piece entirely defined, continue the good work constantly.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group