cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Access struct elements inside dataframe?

schnee1
New Contributor III

I have JSON data set that contains a price in a string like "USD 5.00". I'd like to convert the numeric portion to a Double to use in an MLLIB LabeledPoint, and have managed to split the price string into an array of string. The below creates a data set with the correct structure:

--------------

import org.apache.spark.mllib.linalg.{Vector,Vectors}

import org.apache.spark.mllib.regression.LabeledPoint

case class Obs(f1:Double, f2:Double, price:Array[String])

val obs1 =newObs(1,2,Array("USD","5.00"))

val obs2 =newObs(2,1,Array("USD","3.00"))

val df = sc.parallelize(Seq(obs1,obs2)).toDF()

df.printSchema df.show()

val labeled = df.map(row =>LabeledPoint(row.get(2).asInstanceOf[Array[String]].apply(1).toDouble,Vectors.dense(row.getDouble(0), row.getDouble(1))))

labeled.take(2).foreach(println)

--------------------

When I run this, I get this (and a bit more):

df: org.apache.spark.sql.DataFrame=[f1: double, f2: double, price: array<string>]

"price" is an array of string.

I also get a class cast exception

java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [Ljava.lang.String;

which is probably do to the 'println', but also probably means that I'm not getting the 2nd element of the 'price' structure.

Help?

8 REPLIES 8

User16826991422
Contributor

Hi Schnee -

In this case I would use the explode operator in Dataframes.

With explode you can take an array and apply an operation on all elements in the array.

schnee1
New Contributor III

thanks @cfregly. I think I must need a lot more remedial learning on Scala.

The reference you provided works great, but when I try to translate it into my problem via:

val dfExploded = df.explode(df("price")) { case Row(pr: Array[Row]) => pr.map(pr => Price(pr(0).asInstanceOf[String], pr(1).asInstanceOf[String]) ) }

dfExploded.show()

I wind up with exceptions.

cfregly
Contributor

@schnee​ 

ha! nah, this is an unnecessarily verbose and complex way of doing a fairly common transformation.

it would be nice to have a df.explodeArray() method.

anyway, what type of exceptions are you seeing?

schnee1
New Contributor III

I wound up getting past it with something like:

val assembler = new VectorAssembler() .setInputCols(Array("f1", "f2")) .setOutputCol("features")

val labeled = assembler.transform(df) .select($"price".getItem(1).cast("double"), $"features") .map{case Row(price: Double, features: Vector) => LabeledPoint(price, features)}

which is seems much less verbose (h/t stackoverflow) and directly "promotes" the struct's elements to where I need them.

I also wound up getting past the exceptions (the were, IIRC, match exceptions).

Thanks for the lean-in.

schnee1
New Contributor III

For a bit more detail, this sort of worked:

val dfExploded = df.explode(df("price")) { case Row(pr: WrappedArray[String]) => pr.map(pr => Price(pr(0).toString, pr(1).toString) ) }

dfExploded.show()

(I had to use "WrappedArray" instead of "Array" to get past the exceptions)

but the output had some problems (char limits in this forum are forcing be to be terse)

yuga
New Contributor II

@schnee

It is clear from the exception that row.get(2) is of WrappedArray object. It is because Array is DataType of ArrayType. All ArrayType objects are stored as WrappedArray[Any]. So, to retrieve price, do row.get(2).asInstanceOf[Array[String]].

goldentriangle
New Contributor II