cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Converting a transformation written in Spark Scala to PySpark

RiyazAli
Contributor III

Hello all,

I've been tasked to convert a Scala Spark code to PySpark code with minimal changes (kinda literal translation).

I've come across some code that claims to be a list comprehension. Look below for code snippet:

%scala
val desiredColumn = Seq("firstName", "middleName", "lastName")
val colSize = desiredColumn.size
 
val columnList = for (i <- 0 until colSize) yield $"elements".getItem(i).alias(desiredColumn(i))
 
print(columnList)
 
// df_nameSplit.select(columnList: _ *).show(false)

Output for this code snippet:

Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)desiredColumn: Seq[String] = List(firstName, middleName, lastName)
colSize: Int = 3
columnList: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)

Also, the schema of the `df_nameSplit` data frame is as below and the elements column is a split version of the `name` column:

root
 |-- name: string (nullable = true)
 |-- dob_year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = false)

The PySpark version of the code I was able to come-up with:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [df_nameSplit.select(col("elements").getItem(i).alias(desired_columns[i])) for i in range(col_size)]
 
print(col_list)
 
# df_nameSplit.select(*col_list).display()

Output for PySpark code:

[DataFrame[firstName: string], DataFrame[middleName: string], DataFrame[lastName: string]]

Could someone help me with where I'm going wrong?

Tagging @Kaniz Fatma​ for better reach!

1 ACCEPTED SOLUTION

Accepted Solutions

Pat
Honored Contributor III

Hi @Riyaz Ali​ ,

check this one:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
 
print(col_list)

the output is:

[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]

 test:

from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
 
schema = StructType([ 
    StructField("id",StringType(),True), 
    StructField("elements",ArrayType(StringType()),True)
  ])
 
 
data = [
 ("1",["john","jack","doe"])
]
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
 
root
 |-- id: string (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = true)
 
+---+-----------------+
| id|         elements|
+---+-----------------+
|  1|[john, jack, doe]|
+---+-----------------+
 
df.select(*col_list).display()
 
 
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john     |jack      |doe     |
+---------+----------+--------+

View solution in original post

3 REPLIES 3

Pat
Honored Contributor III

Hi @Riyaz Ali​ ,

check this one:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
 
print(col_list)

the output is:

[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]

 test:

from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
 
schema = StructType([ 
    StructField("id",StringType(),True), 
    StructField("elements",ArrayType(StringType()),True)
  ])
 
 
data = [
 ("1",["john","jack","doe"])
]
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
 
root
 |-- id: string (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = true)
 
+---+-----------------+
| id|         elements|
+---+-----------------+
|  1|[john, jack, doe]|
+---+-----------------+
 
df.select(*col_list).display()
 
 
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john     |jack      |doe     |
+---------+----------+--------+

RiyazAli
Contributor III

Thank you @Pat Sienkiewicz​ !

This makes a whole lotta sense! Not sure why I was selecting from a data frame when all I needed were columns.

RiyazAli
Contributor III

Another follow-up question, if you don't mind. @Pat Sienkiewicz​ 

As I was trying to parse the name column into multiple columns. I came across the data below:

("James,\"A,B\", Smith", "2018",  "M", 3000)

In order to parse these comma-included middle names, I was using the `from_csv` function.

The Scala Spark code looks like the below:

%scala
// using from_csv function with defined Schema to split the columns.
 
val options = Map("sep" -> ",")
 
val df_split = df.select($"*", F.from_csv($"name", simpleSchema, options).alias("value_parsed"))
 
val df_multi_cols = df_split.select("*", "value_parsed.*").drop("value_parsed")
 
df.show(false)
df_multi_cols.show(false)

The schema that's mentioned above is as follows:

%scala
// schema in scala
 
val simpleSchema = new StructType()
                    .add("firstName", StringType)
                    .add("middleName",StringType)
                    .add("lastName",StringType)

Now the code that I came up for PySpark is that:

#Schema in PySpark
simple_schema = (StructType()
                 .add('firstName', StringType())
                 .add('middleName', StringType())
                 .add('lastName', StringType())
                )
options = {'sep':','}
 
df_split = df_is.select("*", from_csv(df_is.name, simple_schema, options).alias("value_parsed"))
 
#df_split.printSchema()

This throws up an error: `TypeError: schema argument should be a column or string`

Now following the error, if I define the schema in the SQL style (in quotes), it works.

options = {'sep':','}
 
df_split = df_is.select("*", from_csv(df_is.name, "firstName string, middleName string, lastName string", options).alias("value_parsed"))
 
df_split.printSchema()

I'm intrigued as to why it works in Scala Spark and why not in PySpark. Any leads would be greatly appreciated.

Best,

Riz

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.