cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Skewness / Salting with countDistinct

KosmaS
New Contributor III

Hey Everyone,

I experience data skewness for:

 

df = (source_df
.unionByName(source_df.withColumn("region", lit("Country")))
.groupBy("zip_code", "region", "device_type")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))

 

 

Stats:
Screenshot 2024-08-05 at 17.24.08.png

Is there a way to work with data skewness where I need to calculate a countDistinct in aggregation and avoid affecting the results?

I understand how to work with data skewness by adding salting, but it seems to be fine with count.
But when countDistinct comes to the picture salting seems to be affecting the results.
Is there some tricky way to still apply salting and secure deterministic results for countDistinct?
Or is there some other approach in such case to be applied for data skewness?

2 REPLIES 2

Kaniz_Fatma
Community Manager
Community Manager

Hi @KosmaS, To address data skewness with `countDistinct`, you can use several techniques:

Double Aggregation involves salting the data, performing an aggregation, then removing the salt and aggregating again to reduce skewness.

HyperLogLog (HLL) provides approximate results for `countDistinct`, balancing accuracy and performance.

Broadcast Joins can help with small skewed datasets by reducing shuffle issues, while Partitioning the data based on skewed keys can distribute the load more evenly. 

If you have any more details or specific constraints, feel free to share!

KosmaS
New Contributor III

Hey @Kaniz_Fatma 

thanks for the reply. I tried to spend some time on your response.

You're suggesting 'double aggregation' and as I'd be guessing it should look more or less this way:

df = (source_df
.unionByName(source_df.withColumn("region", lit("Country")))
.groupBy("zip_code", "region", "device_type", "salt")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))
.groupBy("zip_code", "region", "device_type")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))

 

So I can't see how countDistinct value won't be affected by salt. It'll be affected at the first step (with salt), so the second step will have inaccurate results. Or should this be done a bit differently? And did you mean something else?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group