- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-09-2022 06:59 AM
Hello all,
I've been tasked to convert a Scala Spark code to PySpark code with minimal changes (kinda literal translation).
I've come across some code that claims to be a list comprehension. Look below for code snippet:
%scala
val desiredColumn = Seq("firstName", "middleName", "lastName")
val colSize = desiredColumn.size
val columnList = for (i <- 0 until colSize) yield $"elements".getItem(i).alias(desiredColumn(i))
print(columnList)
// df_nameSplit.select(columnList: _ *).show(false)
Output for this code snippet:
Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)desiredColumn: Seq[String] = List(firstName, middleName, lastName)
colSize: Int = 3
columnList: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)
Also, the schema of the `df_nameSplit` data frame is as below and the elements column is a split version of the `name` column:
root
|-- name: string (nullable = true)
|-- dob_year: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: long (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = false)
The PySpark version of the code I was able to come-up with:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [df_nameSplit.select(col("elements").getItem(i).alias(desired_columns[i])) for i in range(col_size)]
print(col_list)
# df_nameSplit.select(*col_list).display()
Output for PySpark code:
[DataFrame[firstName: string], DataFrame[middleName: string], DataFrame[lastName: string]]
Could someone help me with where I'm going wrong?
Tagging @Kaniz Fatma for better reach!
- Labels:
-
Pyspark
-
Scala spark
-
Spark scala
-
Transformation
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-09-2022 01:39 PM
Hi @Riyaz Ali ,
check this one:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
print(col_list)
the output is:
[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]
test:
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
schema = StructType([
StructField("id",StringType(),True),
StructField("elements",ArrayType(StringType()),True)
])
data = [
("1",["john","jack","doe"])
]
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
root
|-- id: string (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = true)
+---+-----------------+
| id| elements|
+---+-----------------+
| 1|[john, jack, doe]|
+---+-----------------+
df.select(*col_list).display()
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john |jack |doe |
+---------+----------+--------+
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-09-2022 01:39 PM
Hi @Riyaz Ali ,
check this one:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
print(col_list)
the output is:
[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]
test:
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
schema = StructType([
StructField("id",StringType(),True),
StructField("elements",ArrayType(StringType()),True)
])
data = [
("1",["john","jack","doe"])
]
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
root
|-- id: string (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = true)
+---+-----------------+
| id| elements|
+---+-----------------+
| 1|[john, jack, doe]|
+---+-----------------+
df.select(*col_list).display()
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john |jack |doe |
+---------+----------+--------+
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-10-2022 01:38 AM
Thank you @Pat Sienkiewicz !
This makes a whole lotta sense! Not sure why I was selecting from a data frame when all I needed were columns.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
11-10-2022 02:43 AM
Another follow-up question, if you don't mind. @Pat Sienkiewicz
As I was trying to parse the name column into multiple columns. I came across the data below:
("James,\"A,B\", Smith", "2018", "M", 3000)
In order to parse these comma-included middle names, I was using the `from_csv` function.
The Scala Spark code looks like the below:
%scala
// using from_csv function with defined Schema to split the columns.
val options = Map("sep" -> ",")
val df_split = df.select($"*", F.from_csv($"name", simpleSchema, options).alias("value_parsed"))
val df_multi_cols = df_split.select("*", "value_parsed.*").drop("value_parsed")
df.show(false)
df_multi_cols.show(false)
The schema that's mentioned above is as follows:
%scala
// schema in scala
val simpleSchema = new StructType()
.add("firstName", StringType)
.add("middleName",StringType)
.add("lastName",StringType)
Now the code that I came up for PySpark is that:
#Schema in PySpark
simple_schema = (StructType()
.add('firstName', StringType())
.add('middleName', StringType())
.add('lastName', StringType())
)
options = {'sep':','}
df_split = df_is.select("*", from_csv(df_is.name, simple_schema, options).alias("value_parsed"))
#df_split.printSchema()
This throws up an error: `TypeError: schema argument should be a column or string`
Now following the error, if I define the schema in the SQL style (in quotes), it works.
options = {'sep':','}
df_split = df_is.select("*", from_csv(df_is.name, "firstName string, middleName string, lastName string", options).alias("value_parsed"))
df_split.printSchema()
I'm intrigued as to why it works in Scala Spark and why not in PySpark. Any leads would be greatly appreciated.
Best,
Riz

