โ05-26-2015 11:38 AM
โ05-26-2015 03:49 PM
RDDs
Before DataFrames, you would use
RDD.groupBy()
to group your data. This method is very expensive and requires a complete reshuffle of all of your data to ensure all records with the same key end up on the same Spark Worker Node. Instead, you should use
RDD.aggregate()
, PairRDDFunctions.groupByKey()
, or PairRDDFunctions.reduceByKey()
if you're grouping for the purposes of aggregating data such as sum()
or count()
.Note: You need to
import org.apache.spark.rdd.PairRDDFunctions
DataFrames
At the moment, all DataFrame grouping operations assume that you're grouping for the purposes of aggregating data.
If you're looking to group for any other reason (not common), you'll need to get a reference to the underlying RDD as follows:
sessionsDF.rdd.groupBy(...)
Otherwise, the DataFrame
groupBy()
method returns a GroupedData instance as follows:import org.apache.spark.sql.GroupedData
val sessionsDF = Seq(("day1","user1","session1", 100.0),("day1","user1","session2",200.0),
("day2","user1","session3",300.0),("day2","user1","session4",400.0), ("day2","user1","session4",99.0)) .toDF("day","userId","sessionId","purchaseTotal")val groupedSessions: GroupedData = sessionsDF.groupBy("day", "userId")
At the moment, there's not much you can do with a
GroupedData
instance except add aggregations as follows:import org.apache.spark.sql.functions._
groupedSessionsDF = groupedSessions.agg($"day", $"userId", countDistinct("sessionId"), sum("purchaseTotal"))
There's also a convenience method for agg() that takes a Map of "column" to "aggregation type" as follows:
val groupedSessionsDF = groupedSessions.agg($"day", $"userId", "sessionId" -> "count", "purchaseTotal" -> "sum" )
This convenience method doesn't currently support the complete list of aggregations, so you may need to revert to the slightly-less-convenient approach listed just above for things like
countDistrinct()
and sumDistrinct()
, etcSQL
SQL/HiveQL requires that you specify an aggregation function as follows:
%sql SELECT day, userId, sessionId, purchaseTotal FROM test_df_group_by GROUP BY day, userIdError in SQL statement: java.lang.RuntimeException: org.apache.spark.sql.AnalysisException: expression 'purchaseTotal' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.; at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38) at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:40) at
Here's a version that works...
%sql SELECT day, userId, count(sessionId) as session_count, sum(purchaseTotal) as purchase_total FROM test_df_group_by GROUP BY day, userId
You always want to use the DataFrames or SQL APIs over low-level RDDs. Especially for Python-based apps, you'll see a large performance boost by using these higher-level libraries due to performance optimizations at these layers.
โ11-18-2016 06:00 PM
Hi, @cfreglyโ . Can I put GroupData as a input for KMeans? Or more generally, can I apply a function take grouped dataframe as a parameter? Thanks.
โ03-27-2017 03:18 PM
Hi, I know that I'm posting to an old thread, but my question is as hot as it never has been ๐
I'm actually trying aggregate over windows and calculate some aggregations (avg + stddev) and want in addition also have access to the original rows as well so basically just add my two aggregates to the existing windows....any idea how to do this?
This is the post to stack-overflow (which got downvoted for whatever reason) but feel free to answer here..
@cfregly
โ12-19-2019 02:47 AM
@cfreglyโ : For DataFrames, you can use the following code for using groupBy without aggregations.
Df.groupBy(Df["column_name"]).agg({})
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group