cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Skewness / Salting with countDistinct

KosmaS
New Contributor III

Hey Everyone,

I experience data skewness for:

 

df = (source_df
.unionByName(source_df.withColumn("region", lit("Country")))
.groupBy("zip_code", "region", "device_type")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))

 

 

Stats:
Screenshot 2024-08-05 at 17.24.08.png

Is there a way to work with data skewness where I need to calculate a countDistinct in aggregation and avoid affecting the results?

I understand how to work with data skewness by adding salting, but it seems to be fine with count.
But when countDistinct comes to the picture salting seems to be affecting the results.
Is there some tricky way to still apply salting and secure deterministic results for countDistinct?
Or is there some other approach in such case to be applied for data skewness?

4 REPLIES 4

Retired_mod
Esteemed Contributor III

Hi @KosmaS, To address data skewness with `countDistinct`, you can use several techniques:

Double Aggregation involves salting the data, performing an aggregation, then removing the salt and aggregating again to reduce skewness.

HyperLogLog (HLL) provides approximate results for `countDistinct`, balancing accuracy and performance.

Broadcast Joins can help with small skewed datasets by reducing shuffle issues, while Partitioning the data based on skewed keys can distribute the load more evenly. 

If you have any more details or specific constraints, feel free to share!

KosmaS
New Contributor III

Hey @Retired_mod 

thanks for the reply. I tried to spend some time on your response.

You're suggesting 'double aggregation' and as I'd be guessing it should look more or less this way:

df = (source_df
.unionByName(source_df.withColumn("region", lit("Country")))
.groupBy("zip_code", "region", "device_type", "salt")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))
.groupBy("zip_code", "region", "device_type")
.agg(countDistinct("device_id").alias("total_active_unique"), count("device_id").alias("total_active"))

 

So I can't see how countDistinct value won't be affected by salt. It'll be affected at the first step (with salt), so the second step will have inaccurate results. Or should this be done a bit differently? And did you mean something else?

singhvikash86
New Contributor II

What about salt function is function on device_id produces mutually exclusive results like hash(device_id) % 101 and then one more aggregation to sum of these counts group by zip_code, region, device_type

Avinash_Narala
Valued Contributor II

you can make use of databricks native feature "Liquid Clustering", cluster by the columns which you want to use in grouping statements, it will handle the performance issue due to data skewness .

For more information, please do visit :

https://docs.databricks.com/en/delta/clustering.html

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now