โ11-09-2022 06:59 AM
Hello all,
I've been tasked to convert a Scala Spark code to PySpark code with minimal changes (kinda literal translation).
I've come across some code that claims to be a list comprehension. Look below for code snippet:
%scala
val desiredColumn = Seq("firstName", "middleName", "lastName")
val colSize = desiredColumn.size
val columnList = for (i <- 0 until colSize) yield $"elements".getItem(i).alias(desiredColumn(i))
print(columnList)
// df_nameSplit.select(columnList: _ *).show(false)
Output for this code snippet:
Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)desiredColumn: Seq[String] = List(firstName, middleName, lastName)
colSize: Int = 3
columnList: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)
Also, the schema of the `df_nameSplit` data frame is as below and the elements column is a split version of the `name` column:
root
|-- name: string (nullable = true)
|-- dob_year: string (nullable = true)
|-- gender: string (nullable = true)
|-- salary: long (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = false)
The PySpark version of the code I was able to come-up with:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [df_nameSplit.select(col("elements").getItem(i).alias(desired_columns[i])) for i in range(col_size)]
print(col_list)
# df_nameSplit.select(*col_list).display()
Output for PySpark code:
[DataFrame[firstName: string], DataFrame[middleName: string], DataFrame[lastName: string]]
Could someone help me with where I'm going wrong?
Tagging @Kaniz Fatmaโ for better reach!
โ11-09-2022 01:39 PM
Hi @Riyaz Aliโ ,
check this one:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
print(col_list)
the output is:
[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]
test:
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
schema = StructType([
StructField("id",StringType(),True),
StructField("elements",ArrayType(StringType()),True)
])
data = [
("1",["john","jack","doe"])
]
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
root
|-- id: string (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = true)
+---+-----------------+
| id| elements|
+---+-----------------+
| 1|[john, jack, doe]|
+---+-----------------+
df.select(*col_list).display()
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john |jack |doe |
+---------+----------+--------+
โ11-09-2022 01:39 PM
Hi @Riyaz Aliโ ,
check this one:
desired_columns = ["firstName", "middleName", "lastName"]
col_size = len(desired_columns)
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
print(col_list)
the output is:
[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]
test:
from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
schema = StructType([
StructField("id",StringType(),True),
StructField("elements",ArrayType(StringType()),True)
])
data = [
("1",["john","jack","doe"])
]
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
root
|-- id: string (nullable = true)
|-- elements: array (nullable = true)
| |-- element: string (containsNull = true)
+---+-----------------+
| id| elements|
+---+-----------------+
| 1|[john, jack, doe]|
+---+-----------------+
df.select(*col_list).display()
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john |jack |doe |
+---------+----------+--------+
โ11-10-2022 01:38 AM
Thank you @Pat Sienkiewiczโ !
This makes a whole lotta sense! Not sure why I was selecting from a data frame when all I needed were columns.
โ11-10-2022 02:43 AM
Another follow-up question, if you don't mind. @Pat Sienkiewiczโ
As I was trying to parse the name column into multiple columns. I came across the data below:
("James,\"A,B\", Smith", "2018", "M", 3000)
In order to parse these comma-included middle names, I was using the `from_csv` function.
The Scala Spark code looks like the below:
%scala
// using from_csv function with defined Schema to split the columns.
val options = Map("sep" -> ",")
val df_split = df.select($"*", F.from_csv($"name", simpleSchema, options).alias("value_parsed"))
val df_multi_cols = df_split.select("*", "value_parsed.*").drop("value_parsed")
df.show(false)
df_multi_cols.show(false)
The schema that's mentioned above is as follows:
%scala
// schema in scala
val simpleSchema = new StructType()
.add("firstName", StringType)
.add("middleName",StringType)
.add("lastName",StringType)
Now the code that I came up for PySpark is that:
#Schema in PySpark
simple_schema = (StructType()
.add('firstName', StringType())
.add('middleName', StringType())
.add('lastName', StringType())
)
options = {'sep':','}
df_split = df_is.select("*", from_csv(df_is.name, simple_schema, options).alias("value_parsed"))
#df_split.printSchema()
This throws up an error: `TypeError: schema argument should be a column or string`
Now following the error, if I define the schema in the SQL style (in quotes), it works.
options = {'sep':','}
df_split = df_is.select("*", from_csv(df_is.name, "firstName string, middleName string, lastName string", options).alias("value_parsed"))
df_split.printSchema()
I'm intrigued as to why it works in Scala Spark and why not in PySpark. Any leads would be greatly appreciated.
Best,
Riz
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group