cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?

cfregly
Contributor
 
4 REPLIES 4

cfregly
Contributor

RDDs

Before DataFrames, you would use

RDD.groupBy()
to group your data. This method is very expensive and requires a complete reshuffle of all of your data to ensure all records with the same key end up on the same Spark Worker Node.

Instead, you should use

RDD.aggregate()
,
PairRDDFunctions.groupByKey()
, or
PairRDDFunctions.reduceByKey()
if you're grouping for the purposes of aggregating data such as
sum()
or
count()
.

Note: You need to

import org.apache.spark.rdd.PairRDDFunctions

DataFrames

At the moment, all DataFrame grouping operations assume that you're grouping for the purposes of aggregating data.

If you're looking to group for any other reason (not common), you'll need to get a reference to the underlying RDD as follows:

sessionsDF.rdd.groupBy(...)

Otherwise, the DataFrame

groupBy()
method returns a GroupedData instance as follows:

import org.apache.spark.sql.GroupedData

val sessionsDF = Seq(("day1","user1","session1", 100.0),("day1","user1","session2",200.0),

("day2","user1","session3",300.0),("day2","user1","session4",400.0),

("day2","user1","session4",99.0)) .toDF("day","userId","sessionId","purchaseTotal")

val groupedSessions: GroupedData = sessionsDF.groupBy("day", "userId")

At the moment, there's not much you can do with a

GroupedData
instance except add aggregations as follows:

import org.apache.spark.sql.functions._

groupedSessionsDF = groupedSessions.agg($"day", $"userId", countDistinct("sessionId"), sum("purchaseTotal"))

There's also a convenience method for agg() that takes a Map of "column" to "aggregation type" as follows:

val groupedSessionsDF = groupedSessions.agg($"day", $"userId", "sessionId" -> "count", "purchaseTotal" -> "sum" )

This convenience method doesn't currently support the complete list of aggregations, so you may need to revert to the slightly-less-convenient approach listed just above for things like

countDistrinct()
and
sumDistrinct()
, etc

SQL

SQL/HiveQL requires that you specify an aggregation function as follows:

%sql SELECT day, userId, sessionId, purchaseTotal FROM test_df_group_by GROUP BY day, userId

Error in SQL statement: java.lang.RuntimeException: org.apache.spark.sql.AnalysisException: expression 'purchaseTotal' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:40) at

Here's a version that works...

%sql SELECT day, userId, count(sessionId) as session_count, sum(purchaseTotal) as purchase_total FROM test_df_group_by GROUP BY day, userId

You always want to use the DataFrames or SQL APIs over low-level RDDs. Especially for Python-based apps, you'll see a large performance boost by using these higher-level libraries due to performance optimizations at these layers.

zhuangmz
New Contributor II

Hi, @cfreglyโ€‹  . Can I put GroupData as a input for KMeans? Or more generally, can I apply a function take grouped dataframe as a parameter? Thanks.

Hi, I know that I'm posting to an old thread, but my question is as hot as it never has been ๐Ÿ˜‰

I'm actually trying aggregate over windows and calculate some aggregations (avg + stddev) and want in addition also have access to the original rows as well so basically just add my two aggregates to the existing windows....any idea how to do this?

This is the post to stack-overflow (which got downvoted for whatever reason) but feel free to answer here..

http://stackoverflow.com/questions/43014064/how-to-calculate-z-score-on-dataframe-api-in-apachespark...

@cfregly

GeethGovindSrin
New Contributor II

@cfreglyโ€‹ : For DataFrames, you can use the following code for using groupBy without aggregations.

Df.groupBy(Df["column_name"]).agg({})

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.