RDDs
Before DataFrames, you would use
RDD.groupBy()
to group your data. This method is very expensive and requires a complete reshuffle of all of your data to ensure all records with the same key end up on the same Spark Worker Node.
Instead, you should use
RDD.aggregate()
,
PairRDDFunctions.groupByKey()
, or
PairRDDFunctions.reduceByKey()
if you're grouping for the purposes of aggregating data such as
sum()
or
count()
.
Note: You need to
import org.apache.spark.rdd.PairRDDFunctions
DataFrames
At the moment, all DataFrame grouping operations assume that you're grouping for the purposes of aggregating data.
If you're looking to group for any other reason (not common), you'll need to get a reference to the underlying RDD as follows:
sessionsDF.rdd.groupBy(...)
Otherwise, the DataFrame
groupBy()
method returns a GroupedData instance as follows:import org.apache.spark.sql.GroupedData
val sessionsDF = Seq(("day1","user1","session1", 100.0),("day1","user1","session2",200.0),
("day2","user1","session3",300.0),("day2","user1","session4",400.0),
("day2","user1","session4",99.0))
.toDF("day","userId","sessionId","purchaseTotal")
val groupedSessions: GroupedData = sessionsDF.groupBy("day", "userId")
At the moment, there's not much you can do with a
GroupedData
instance except add
aggregations as follows:import org.apache.spark.sql.functions._
groupedSessionsDF = groupedSessions.agg($"day", $"userId", countDistinct("sessionId"), sum("purchaseTotal"))
There's also a convenience method for agg() that takes a Map of "column" to "aggregation type" as follows:
val groupedSessionsDF = groupedSessions.agg($"day", $"userId", "sessionId" -> "count", "purchaseTotal" -> "sum" )
This convenience method doesn't currently support the complete list of aggregations, so you may need to revert to the slightly-less-convenient approach listed just above for things like
countDistrinct()
and
sumDistrinct()
, etc
SQL
SQL/HiveQL requires that you specify an aggregation function as follows:
%sql SELECT day, userId, sessionId, purchaseTotal FROM test_df_group_by GROUP BY day, userId
Error in SQL statement: java.lang.RuntimeException: org.apache.spark.sql.AnalysisException: expression 'purchaseTotal' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:40)
at
Here's a version that works...
%sql SELECT day, userId, count(sessionId) as session_count, sum(purchaseTotal) as purchase_total FROM test_df_group_by GROUP BY day, userId
You always want to use the DataFrames or SQL APIs over low-level RDDs. Especially for Python-based apps, you'll see a large performance boost by using these higher-level libraries due to performance optimizations at these layers.