<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How do I register a UDF that returns an array of tuples in scala/spark? in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29626#M21346</link>
    <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I'm assuming you already found your answer, but since this is the top result that comes up when googling this issue and it remains unanswered, I'll add my 2 cents.&lt;/P&gt;
&lt;P&gt;As far as I know, all the elements in your ArrayType have to be of the same Type.&lt;/P&gt;
&lt;P&gt;So for instance, you can register a simple function returning a list of strings with the following syntax:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;sqlContext.udf.register("your_func_name", your_func_name, ArrayType(StringType()))&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;I assume the reason your PySpark code works is because defininf the array elements as "StructTypes" provides a workaround for this restriction, which might not work the same in Scala.&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
    <pubDate>Tue, 06 Dec 2016 10:32:48 GMT</pubDate>
    <dc:creator>MatiasRotenberg</dc:creator>
    <dc:date>2016-12-06T10:32:48Z</dc:date>
    <item>
      <title>How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29621#M21341</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I'm relatively new to Scala. In the past, I was able to do the following python:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;def foo(p1, p2):
    import datetime as dt
    dt.datetime(2014, 4, 17, 12, 34)
    result = [
        (1, "1", 1.1, dt.datetime(2014, 4, 17, 1, 0)),
        (2, "2", 2.2, dt.datetime(2014, 4, 17, 2, 0)),
        (3, "3", 3.3, dt.datetime(2014, 4, 17, 3, 0))
    ]
    return result&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Now I register it to a UDF:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;from pyspark.sql.types import *
schema = ArrayType(
            StructType([
                StructField('int'     , IntegerType()   , False),
                StructField('string'  , StringType()    , False),
                StructField('float'   , IntegerType()   , False),
                StructField('datetime', TimestampType() , False)
            ])
        )
sqlContext.registerFunction("foo", foo, schema)
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Finally, here is how I intend to use it:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;sqlContext.sql("""
select
    a.foo_output.int      as f_int
  , a.foo_output.float    as f_float
  , a.foo_output.string   as f_string
  , a.foo_output.datetime as f_datetime
from (select explode(foo(1, 7)) as foo_output) a
""").show()
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;This actual works in pyspark as shown above. See &lt;/P&gt;
&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper" image-alt="0693f000007OoHdAAK"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/2530i544A5C79C38CFFDC/image-size/large?v=v2&amp;amp;px=999" role="button" title="0693f000007OoHdAAK" alt="0693f000007OoHdAAK" /&gt;&lt;/span&gt;&lt;/P&gt;
&lt;P&gt;I was not able to get the same thing to work in scala. Can anyone point me to the proper way to do this in scala/spark. When I tried to register the schema:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;def foo(p1 :Integer, p2 :Integer)
   : Array[Tuple4[Int, String, Float, Timestamp]] =
{ 
   val result = Array(
   (1, "1", 1.1f, new Timestamp(2014, 4, 17, 1, 0, 0, 0)),
   (2, "2", 2.2f, new Timestamp(2014, 4, 17, 2, 0, 0, 0)),
   (3, "3", 3.3f, new Timestamp(2014, 4, 17, 3, 0, 0, 0))
   );
   return result;
}       
// register to Spark
val foo_schema = ArrayType(StructType(Array(
        StructField("int"     , IntegerType  , false),
        StructField("string"  , StringType   , false),
        StructField("float"   , FloatType    , false),
        StructField("datetime", TimestampType, false)
    ))
);
sql_context.udf.register("foo", foo _);  &lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;I get a runtime error:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;org.apache.spark.sql.AnalysisException: No such struct field int in _1, _2, _3, _4; line 2 pos 4
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;So, from the error message, it seems obvious that I didn't attach the schema properly and indeed in the above code, nowhere did I tell spark about it. So I tried to the following:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;sql_context.udf.register("foo", foo _, foo_schema);
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;However, it gives me a compiler error:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;[ERROR] /Users/rykelley/Development/rovi/IntegralReach-Main/ADW/rovi-master-schedule/src/main/scala/com/rovicorp/adw/RoviMasterSchedule/BuildRoviMasterSchedule.scala:247: error: overloaded method value register with alternatives:
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF22[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF21[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF20[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF19[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF18[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF17[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF16[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF15[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF14[_, _, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF13[_, _, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF12[_, _, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF11[_, _, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF10[_, _, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF9[_, _, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF8[_, _, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF7[_, _, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF6[_, _, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF5[_, _, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF4[_, _, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF3[_, _, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF2[_, _, _],returnType: org.apache.spark.sql.types.DataType)Unit &amp;lt;and&amp;gt;
[INFO]   (name: String,f: org.apache.spark.sql.api.java.UDF1[_, _],returnType: org.apache.spark.sql.types.DataType)Unit
[INFO]  cannot be applied to (String, (Integer, Integer) =&amp;gt; Array[(Int, String, Float, java.sql.Timestamp)], org.apache.spark.sql.types.ArrayType)
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Can someone point me in the right direction?&lt;/P&gt;
&lt;P&gt;Note: using spark 1.6.1.&lt;/P&gt;
&lt;P&gt;Thanks&lt;/P&gt;
&lt;P&gt;Ryan&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 30 Jun 2016 20:28:05 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29621#M21341</guid>
      <dc:creator>kelleyrw</dc:creator>
      <dc:date>2016-06-30T20:28:05Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29622#M21342</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I'd recommend following the Databrick's guide to accomplish this:&lt;/P&gt;
&lt;P&gt;&lt;A href="https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/03%20UDF%20and%20UDAF%20-%20scala.html" target="test_blank"&gt;https://docs.cloud.databricks.com/docs/latest/databricks_guide/index.html#04%20SQL,%20DataFrames%20%26%20Datasets/03%20UDF%20and%20UDAF%20-%20scala.html&lt;/A&gt;&lt;/P&gt;
&lt;P&gt;I've imported this guide myself into my environment and was able to get a similar example working no problem. &lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 01 Jul 2016 16:57:27 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29622#M21342</guid>
      <dc:creator>miklos</dc:creator>
      <dc:date>2016-07-01T16:57:27Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29623#M21343</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;That does not really answer my question since these examples do not have a case where the return type is an array of tuples. Can you share your solution?&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 01 Jul 2016 17:20:02 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29623#M21343</guid>
      <dc:creator>kelleyrw</dc:creator>
      <dc:date>2016-07-01T17:20:02Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29624#M21344</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I found a satisfying solution. Instead of using a return type of Array[Typle4[...]], I used a case class to create a simple structure:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;case class Result (
    f_int    : Int,
    f_string : String,
    f_float  : Float,
    f_ts     : Timestamp
);
def foo2(p1 :Integer, p2 :Integer) : Array[Result] =
{ 
   val result = Array(
       Result(1, "1", 1.1f, new Timestamp(2014, 4, 17, 1, 0, 0, 0)),
       Result(2, "2", 2.2f, new Timestamp(2014, 4, 17, 2, 0, 0, 0)),
       Result(3, "3", 3.3f, new Timestamp(2014, 4, 17, 3, 0, 0, 0))
   );
   return result;
}
sqlContext.udf.register("foo2", foo2 _); 
sqlContext.sql("""
select
    a.foo_output.f_int      as f_int
  , a.foo_output.f_float    as f_float
  , a.foo_output.f_string   as f_string
  , a.foo_output.f_ts       as f_datetime
from (select explode(foo2(1, 7)) as foo_output) a
""").show()
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;This seemed to give the desired output and is the same as pyspark. &lt;/P&gt;
&lt;P&gt;I'm still curious as to how to explicitly return a array of tuples. The fact that I got it to work in pyspark lends evidence to the existence of a way to accomplish the same thing in scala/spark.&lt;/P&gt;
&lt;P&gt;Any thoughts?&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Fri, 01 Jul 2016 18:26:09 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29624#M21344</guid>
      <dc:creator>kelleyrw</dc:creator>
      <dc:date>2016-07-01T18:26:09Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29625#M21345</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Any response on this? The link provided doesn't answer the question.&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 06 Jul 2016 13:41:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29625#M21345</guid>
      <dc:creator>kelleyrw</dc:creator>
      <dc:date>2016-07-06T13:41:13Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29626#M21346</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;I'm assuming you already found your answer, but since this is the top result that comes up when googling this issue and it remains unanswered, I'll add my 2 cents.&lt;/P&gt;
&lt;P&gt;As far as I know, all the elements in your ArrayType have to be of the same Type.&lt;/P&gt;
&lt;P&gt;So for instance, you can register a simple function returning a list of strings with the following syntax:&lt;/P&gt;
&lt;PRE&gt;&lt;CODE&gt;sqlContext.udf.register("your_func_name", your_func_name, ArrayType(StringType()))&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;I assume the reason your PySpark code works is because defininf the array elements as "StructTypes" provides a workaround for this restriction, which might not work the same in Scala.&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Tue, 06 Dec 2016 10:32:48 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29626#M21346</guid>
      <dc:creator>MatiasRotenberg</dc:creator>
      <dc:date>2016-12-06T10:32:48Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29627#M21347</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;@kelleyrw might be worth mentioning that your code works well with Spark 2.0 (I've tried it with 2.0.2). However it's still not very well documented - as using Tuples is OK for the return type but not for the input type:&lt;/P&gt;
&lt;UL&gt;&lt;LI&gt;For UDF &lt;I&gt;output&lt;/I&gt; types, you should use plain Scala types (e.g. tuples) as the type of the array elements&lt;/LI&gt;&lt;LI&gt;For UDF &lt;I&gt;input&lt;/I&gt; types, arrays that contain tuples would actually have to be declared as &lt;PRE&gt;&lt;CODE&gt;mutable.WrappedArray[Row]&lt;/CODE&gt;&lt;/PRE&gt;&lt;/LI&gt;&lt;/UL&gt;
&lt;P&gt;So, if you want to manipulate the input array and return the result, you'll have to perform some &lt;I&gt;conversion&lt;/I&gt; from Row into Tuples explicitly. &lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Mon, 09 Jan 2017 16:18:26 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29627#M21347</guid>
      <dc:creator>TzachZohar</dc:creator>
      <dc:date>2017-01-09T16:18:26Z</dc:date>
    </item>
    <item>
      <title>Re: How do I register a UDF that returns an array of tuples in scala/spark?</title>
      <link>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29628#M21348</link>
      <description>&lt;P&gt;&lt;/P&gt;
&lt;P&gt;Hello,&lt;/P&gt;
&lt;P&gt;Just in case, here is an example for proposed solution above:&lt;/P&gt; 
&lt;PRE&gt;&lt;CODE&gt;import org.apache.spark.sql.functions._
import org.apache.spark.sql.expressions._
import org.apache.spark.sql.types._
val data = Seq(("A", Seq((3,4),(5,6),(7,10))), ("B", Seq((-1, 1)))).toDS
data.printSchema
&lt;/CODE&gt;&lt;/PRE&gt;
&lt;PRE&gt;&lt;CODE&gt;root
 |-- _1: string (nullable = true)
 |-- _2: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- _1: integer (nullable = false)
 |    |    |-- _2: integer (nullable = false)&lt;/CODE&gt;&lt;/PRE&gt; 
&lt;PRE&gt;&lt;CODE&gt;def fun(s: Seq[Row]): Seq[(Int, Int)] = {
  s.filter(tuple =&amp;gt; tuple.getInt(0) &amp;gt; 0)
   .map(tuple =&amp;gt; (tuple.getInt(0), tuple.getInt(1)))
}
val funUdf = udf(fun _)
data.select('_1, '_2, funUdf('_2) as "filtered").show(false)&lt;/CODE&gt;&lt;/PRE&gt; 
&lt;PRE&gt;&lt;CODE&gt;+---+----------------------+----------------------+
|_1 |_2                    |filtered              |
+---+----------------------+----------------------+
|A  |[[3,4], [5,6], [7,10]]|[[3,4], [5,6], [7,10]]|
|B  |[[-1,1]]              |[]                    |
+---+----------------------+----------------------+&lt;/CODE&gt;&lt;/PRE&gt;
&lt;P&gt;Best regards,&lt;/P&gt;
&lt;P&gt;Maxim Gekk&lt;/P&gt; 
&lt;P&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 19 Oct 2017 00:40:07 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/how-do-i-register-a-udf-that-returns-an-array-of-tuples-in-scala/m-p/29628#M21348</guid>
      <dc:creator>__max</dc:creator>
      <dc:date>2017-10-19T00:40:07Z</dc:date>
    </item>
  </channel>
</rss>

