<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30487#M22112</link>
    <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;RDDs&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Before DataFrames, you would use &lt;PRE&gt;&lt;CODE&gt;RDD.groupBy()&lt;/CODE&gt;&lt;/PRE&gt; to group your data.  This method is very expensive and requires a complete reshuffle of all of your data to ensure all records with the same key end up on the same Spark Worker Node. &lt;/P&gt;&lt;P&gt;Instead, you should use &lt;PRE&gt;&lt;CODE&gt;RDD.aggregate()&lt;/CODE&gt;&lt;/PRE&gt;, &lt;PRE&gt;&lt;CODE&gt;PairRDDFunctions.groupByKey()&lt;/CODE&gt;&lt;/PRE&gt;, or &lt;PRE&gt;&lt;CODE&gt;PairRDDFunctions.reduceByKey()&lt;/CODE&gt;&lt;/PRE&gt; if you're grouping for the purposes of aggregating data such as &lt;PRE&gt;&lt;CODE&gt;sum()&lt;/CODE&gt;&lt;/PRE&gt; or &lt;PRE&gt;&lt;CODE&gt;count()&lt;/CODE&gt;&lt;/PRE&gt;.&lt;/P&gt;&lt;P&gt;Note:  You need to &lt;PRE&gt;&lt;CODE&gt;import org.apache.spark.rdd.PairRDDFunctions&lt;/CODE&gt;&lt;/PRE&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;DataFrames&lt;/B&gt;&lt;/P&gt;&lt;P&gt;At the moment, all DataFrame grouping operations assume that you're grouping for the purposes of aggregating data. &lt;/P&gt;&lt;P&gt;If you're looking to group for any other reason (not common), you'll need to get a reference to the underlying RDD as follows:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;sessionsDF.rdd.groupBy(...)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Otherwise, the DataFrame &lt;PRE&gt;&lt;CODE&gt;groupBy()&lt;/CODE&gt;&lt;/PRE&gt; method returns a GroupedData instance as follows:&lt;/P&gt;import org.apache.spark.sql.GroupedData 
&lt;P&gt;val sessionsDF = Seq(("day1","user1","session1", 100.0),("day1","user1","session2",200.0),  &lt;/P&gt;&lt;P&gt;&lt;/P&gt;    ("day2","user1","session3",300.0),("day2","user1","session4",400.0),  &lt;P&gt;&lt;/P&gt;      ("day2","user1","session4",99.0))
  .toDF("day","userId","sessionId","purchaseTotal")
&lt;P&gt;val groupedSessions: GroupedData = sessionsDF.groupBy("day", "userId")&lt;/P&gt;&lt;P&gt;At the moment, there's not much you can do with a &lt;PRE&gt;&lt;CODE&gt;GroupedData&lt;/CODE&gt;&lt;/PRE&gt; instance except add &lt;A href="https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank"&gt;aggregations&lt;/A&gt; as follows:&lt;/P&gt;import org.apache.spark.sql.functions._
&lt;P&gt;groupedSessionsDF = groupedSessions.agg($"day", $"userId", countDistinct("sessionId"), sum("purchaseTotal")) &lt;/P&gt;&lt;P&gt;There's also a convenience method for agg() that takes a Map of "column" to "aggregation type" as follows:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val groupedSessionsDF = groupedSessions.agg($"day", $"userId", "sessionId" -&amp;gt; "count", "purchaseTotal" -&amp;gt; "sum" )&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This convenience method doesn't currently support the complete list of &lt;A href="https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank"&gt;aggregations&lt;/A&gt;, so you may need to revert to the slightly-less-convenient approach listed just above for things like &lt;PRE&gt;&lt;CODE&gt;countDistrinct()&lt;/CODE&gt;&lt;/PRE&gt; and &lt;PRE&gt;&lt;CODE&gt;sumDistrinct()&lt;/CODE&gt;&lt;/PRE&gt;, etc&lt;/P&gt;&lt;P&gt;&lt;B&gt;SQL&lt;/B&gt;&lt;/P&gt;&lt;P&gt;SQL/HiveQL requires that you specify an aggregation function as follows:&lt;/P&gt;%sql SELECT day, userId, sessionId, purchaseTotal FROM test_df_group_by GROUP BY day, userId
&lt;P&gt;Error in SQL statement: java.lang.RuntimeException: org.apache.spark.sql.AnalysisException: expression 'purchaseTotal' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:40)
    at &lt;/P&gt;&lt;P&gt;Here's a version that works...&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;%sql SELECT day, userId, count(sessionId) as session_count, sum(purchaseTotal) as purchase_total FROM test_df_group_by GROUP BY day, userId&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;You always want to use the DataFrames or SQL APIs over low-level RDDs.  Especially for Python-based apps, you'll see a large performance boost by using these higher-level libraries due to performance optimizations at these layers.&lt;/P&gt;</description>
    <pubDate>Tue, 26 May 2015 22:49:25 GMT</pubDate>
    <dc:creator>cfregly</dc:creator>
    <dc:date>2015-05-26T22:49:25Z</dc:date>
    <item>
      <title>How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30486#M22111</link>
      <description />
      <pubDate>Tue, 26 May 2015 18:38:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30486#M22111</guid>
      <dc:creator>cfregly</dc:creator>
      <dc:date>2015-05-26T18:38:48Z</dc:date>
    </item>
    <item>
      <title>Re: How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30487#M22112</link>
      <description>&lt;P&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;RDDs&lt;/B&gt;&lt;/P&gt;&lt;P&gt;Before DataFrames, you would use &lt;PRE&gt;&lt;CODE&gt;RDD.groupBy()&lt;/CODE&gt;&lt;/PRE&gt; to group your data.  This method is very expensive and requires a complete reshuffle of all of your data to ensure all records with the same key end up on the same Spark Worker Node. &lt;/P&gt;&lt;P&gt;Instead, you should use &lt;PRE&gt;&lt;CODE&gt;RDD.aggregate()&lt;/CODE&gt;&lt;/PRE&gt;, &lt;PRE&gt;&lt;CODE&gt;PairRDDFunctions.groupByKey()&lt;/CODE&gt;&lt;/PRE&gt;, or &lt;PRE&gt;&lt;CODE&gt;PairRDDFunctions.reduceByKey()&lt;/CODE&gt;&lt;/PRE&gt; if you're grouping for the purposes of aggregating data such as &lt;PRE&gt;&lt;CODE&gt;sum()&lt;/CODE&gt;&lt;/PRE&gt; or &lt;PRE&gt;&lt;CODE&gt;count()&lt;/CODE&gt;&lt;/PRE&gt;.&lt;/P&gt;&lt;P&gt;Note:  You need to &lt;PRE&gt;&lt;CODE&gt;import org.apache.spark.rdd.PairRDDFunctions&lt;/CODE&gt;&lt;/PRE&gt;&lt;/P&gt;&lt;P&gt;&lt;B&gt;DataFrames&lt;/B&gt;&lt;/P&gt;&lt;P&gt;At the moment, all DataFrame grouping operations assume that you're grouping for the purposes of aggregating data. &lt;/P&gt;&lt;P&gt;If you're looking to group for any other reason (not common), you'll need to get a reference to the underlying RDD as follows:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;sessionsDF.rdd.groupBy(...)&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;Otherwise, the DataFrame &lt;PRE&gt;&lt;CODE&gt;groupBy()&lt;/CODE&gt;&lt;/PRE&gt; method returns a GroupedData instance as follows:&lt;/P&gt;import org.apache.spark.sql.GroupedData 
&lt;P&gt;val sessionsDF = Seq(("day1","user1","session1", 100.0),("day1","user1","session2",200.0),  &lt;/P&gt;&lt;P&gt;&lt;/P&gt;    ("day2","user1","session3",300.0),("day2","user1","session4",400.0),  &lt;P&gt;&lt;/P&gt;      ("day2","user1","session4",99.0))
  .toDF("day","userId","sessionId","purchaseTotal")
&lt;P&gt;val groupedSessions: GroupedData = sessionsDF.groupBy("day", "userId")&lt;/P&gt;&lt;P&gt;At the moment, there's not much you can do with a &lt;PRE&gt;&lt;CODE&gt;GroupedData&lt;/CODE&gt;&lt;/PRE&gt; instance except add &lt;A href="https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank"&gt;aggregations&lt;/A&gt; as follows:&lt;/P&gt;import org.apache.spark.sql.functions._
&lt;P&gt;groupedSessionsDF = groupedSessions.agg($"day", $"userId", countDistinct("sessionId"), sum("purchaseTotal")) &lt;/P&gt;&lt;P&gt;There's also a convenience method for agg() that takes a Map of "column" to "aggregation type" as follows:&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;val groupedSessionsDF = groupedSessions.agg($"day", $"userId", "sessionId" -&amp;gt; "count", "purchaseTotal" -&amp;gt; "sum" )&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;This convenience method doesn't currently support the complete list of &lt;A href="https://spark.apache.org/docs/1.3.1/api/scala/index.html#org.apache.spark.sql.functions$" target="_blank"&gt;aggregations&lt;/A&gt;, so you may need to revert to the slightly-less-convenient approach listed just above for things like &lt;PRE&gt;&lt;CODE&gt;countDistrinct()&lt;/CODE&gt;&lt;/PRE&gt; and &lt;PRE&gt;&lt;CODE&gt;sumDistrinct()&lt;/CODE&gt;&lt;/PRE&gt;, etc&lt;/P&gt;&lt;P&gt;&lt;B&gt;SQL&lt;/B&gt;&lt;/P&gt;&lt;P&gt;SQL/HiveQL requires that you specify an aggregation function as follows:&lt;/P&gt;%sql SELECT day, userId, sessionId, purchaseTotal FROM test_df_group_by GROUP BY day, userId
&lt;P&gt;Error in SQL statement: java.lang.RuntimeException: org.apache.spark.sql.AnalysisException: expression 'purchaseTotal' is neither present in the group by, nor is it an aggregate function. Add to group by or wrap in first() if you don't care which value you get.;
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:38)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:40)
    at &lt;/P&gt;&lt;P&gt;Here's a version that works...&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;%sql SELECT day, userId, count(sessionId) as session_count, sum(purchaseTotal) as purchase_total FROM test_df_group_by GROUP BY day, userId&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;You always want to use the DataFrames or SQL APIs over low-level RDDs.  Especially for Python-based apps, you'll see a large performance boost by using these higher-level libraries due to performance optimizations at these layers.&lt;/P&gt;</description>
      <pubDate>Tue, 26 May 2015 22:49:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30487#M22112</guid>
      <dc:creator>cfregly</dc:creator>
      <dc:date>2015-05-26T22:49:25Z</dc:date>
    </item>
    <item>
      <title>Re: How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30488#M22113</link>
      <description>&lt;P&gt;Hi, @cfregly​&amp;nbsp; . Can I put GroupData as a input for KMeans? Or more generally, can I apply a function take grouped dataframe as a parameter? Thanks.&lt;/P&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Sat, 19 Nov 2016 02:00:29 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30488#M22113</guid>
      <dc:creator>zhuangmz</dc:creator>
      <dc:date>2016-11-19T02:00:29Z</dc:date>
    </item>
    <item>
      <title>Re: How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30489#M22114</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Hi, I know that I'm posting to an old thread, but my question is as hot as it never has been &lt;span class="lia-unicode-emoji" title=":winking_face:"&gt;😉&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;I'm actually trying aggregate over windows and calculate some aggregations (avg + stddev) and want in addition also have access to the original rows as well so basically just add my two aggregates to the existing windows....any idea how to do this?&lt;/P&gt;
&lt;P&gt;This is the post to stack-overflow (which got downvoted for whatever reason) but feel free to answer here..&lt;/P&gt;
&lt;P&gt;&lt;A href="http://stackoverflow.com/questions/43014064/how-to-calculate-z-score-on-dataframe-api-in-apachespark-stucured-streaming/43057323#43057323" target="test_blank"&gt;http://stackoverflow.com/questions/43014064/how-to-calculate-z-score-on-dataframe-api-in-apachespark-stucured-streaming/43057323#43057323&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;@cfregly&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 27 Mar 2017 22:18:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30489#M22114</guid>
      <dc:creator>RomeoKienzler</dc:creator>
      <dc:date>2017-03-27T22:18:54Z</dc:date>
    </item>
    <item>
      <title>Re: How do I group my dataset by a key or combination of keys without doing any aggregations using RDDs, DataFrames, and SQL?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30490#M22115</link>
      <description>&lt;P&gt;@cfregly​&amp;nbsp;: For DataFrames, you can use the following code for using groupBy without aggregations.&lt;/P&gt;&lt;PRE&gt;&lt;CODE&gt;Df.groupBy(Df["column_name"]).agg({})&lt;/CODE&gt;&lt;/PRE&gt;&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Dec 2019 10:47:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-group-my-dataset-by-a-key-or-combination-of-keys/m-p/30490#M22115</guid>
      <dc:creator>GeethGovindSrin</dc:creator>
      <dc:date>2019-12-19T10:47:04Z</dc:date>
    </item>
  </channel>
</rss>

