cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

How do I get a cartesian product of a huge dataset?

rlgarris
Databricks Employee
Databricks Employee

A cartesian product is a common operation to get the cross product of two tables.

For example, say you have a list of customers and a list of your product catalog and want to get the cross product of all customer - product combinations.

Cartesian products however can be a very expensive operation. Even with as little as 6,000 products and 100,000 customers the output will be 600 million records (6K x 100K = 600M)

1 ACCEPTED SOLUTION

Accepted Solutions

miklos
Contributor

To do this effectively in Spark try the following technique:

1. Cache the SMALLER of the two datasets (which in this case is the 6,000 item product catalog). When you cache you will need to repartition the data such that there is data on every worker so you can use as many tasks as possible. Say you that your cluster has 25 workers with 4 cores per worker. You therefore need 4 x 25 = 100 partitions.

val products = sqlContext.table("products").repartition(100).cache()

2. Remember to call an action on the products DataFrame so that the caching occurs before the job starts.

products.take(1)

Check the SparkUI -> Storage tab to make sure that the products DF cache has been distributed on all nodes.

3. Disable broadcast joins temporarily. Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes.

%sql set spark.sql.autoBroadcastJoinThreshold =0

4. Call join with the other table without using a join condition. You don't need to cache the larger table since it will take just as long to cache the bigger dataset as run the cross product.

val customers = table("customers")
val joined = customers.join(products)

5. Run an explain plan on the dataframe before executing to confirm you have a cartesian product operation.

joined.explain()

==PhysicalPlan== CartesianProduct :-ConvertToSafe :+-ScanParquetRelation[customer_key#45642,name#45643] InputPaths: dbfs:/tmp/customers +-ConvertToSafe +-ScanParquetRelation[product_key#45644,name#45645] InputPaths: dbfs:/tmp/products

6. Lastly save your result.

joined.write.save(...path...)

Happy Sparking!

View solution in original post

5 REPLIES 5

miklos
Contributor

To do this effectively in Spark try the following technique:

1. Cache the SMALLER of the two datasets (which in this case is the 6,000 item product catalog). When you cache you will need to repartition the data such that there is data on every worker so you can use as many tasks as possible. Say you that your cluster has 25 workers with 4 cores per worker. You therefore need 4 x 25 = 100 partitions.

val products = sqlContext.table("products").repartition(100).cache()

2. Remember to call an action on the products DataFrame so that the caching occurs before the job starts.

products.take(1)

Check the SparkUI -> Storage tab to make sure that the products DF cache has been distributed on all nodes.

3. Disable broadcast joins temporarily. Broadcast joins don't work well for cartesian products because the workers get so much broadcast data they get stuck in an infinite garbage collection loop and never finish. Remember to turn this back on when the query finishes.

%sql set spark.sql.autoBroadcastJoinThreshold =0

4. Call join with the other table without using a join condition. You don't need to cache the larger table since it will take just as long to cache the bigger dataset as run the cross product.

val customers = table("customers")
val joined = customers.join(products)

5. Run an explain plan on the dataframe before executing to confirm you have a cartesian product operation.

joined.explain()

==PhysicalPlan== CartesianProduct :-ConvertToSafe :+-ScanParquetRelation[customer_key#45642,name#45643] InputPaths: dbfs:/tmp/customers +-ConvertToSafe +-ScanParquetRelation[product_key#45644,name#45645] InputPaths: dbfs:/tmp/products

6. Lastly save your result.

joined.write.save(...path...)

Happy Sparking!

danielrmeyer
New Contributor II

Does this answer still represent best practices? I am curious about the use of join rather than cartesian. Why not

val joined = customers.cartesian(products)
?

danielrmeyer
New Contributor II

I am curious to know if this is still the best recommendation for doing a large cartesian product in spark. For example is it better to use 'join' rather than 'cartesian' specifically? Why not 'val joined = customer.cartesian(products)'

Rajjat
New Contributor II

I have also the same problem. I have a big RDD and I want to calculate the similarity between elements.When I take cartesian on this Big rdd, it causes a lot of shuffles. Is there any way around?Can we comapre the elemnets without using cartesian?

val cart=bigRDD.cartesian(bigRDD)

@Miklos, please help me reagrding this.

Forum_Admin
Contributor

Hi buddies, it is great written piece entirely defined, continue the good work constantly.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group