cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Converting a transformation written in Spark Scala to PySpark

RiyazAli
Valued Contributor

Hello all,

I've been tasked to convert a Scala Spark code to PySpark code with minimal changes (kinda literal translation).

I've come across some code that claims to be a list comprehension. Look below for code snippet:

%scala
val desiredColumn = Seq("firstName", "middleName", "lastName")
val colSize = desiredColumn.size
 
val columnList = for (i <- 0 until colSize) yield $"elements".getItem(i).alias(desiredColumn(i))
 
print(columnList)
 
// df_nameSplit.select(columnList: _ *).show(false)

Output for this code snippet:

Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)desiredColumn: Seq[String] = List(firstName, middleName, lastName)
colSize: Int = 3
columnList: scala.collection.immutable.IndexedSeq[org.apache.spark.sql.Column] = Vector(elements[0] AS firstName, elements[1] AS middleName, elements[2] AS lastName)

Also, the schema of the `df_nameSplit` data frame is as below and the elements column is a split version of the `name` column:

root
 |-- name: string (nullable = true)
 |-- dob_year: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = false)

The PySpark version of the code I was able to come-up with:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [df_nameSplit.select(col("elements").getItem(i).alias(desired_columns[i])) for i in range(col_size)]
 
print(col_list)
 
# df_nameSplit.select(*col_list).display()

Output for PySpark code:

[DataFrame[firstName: string], DataFrame[middleName: string], DataFrame[lastName: string]]

Could someone help me with where I'm going wrong?

Tagging @Kaniz Fatmaโ€‹ for better reach!

1 ACCEPTED SOLUTION

Accepted Solutions

Pat
Honored Contributor III

Hi @Riyaz Aliโ€‹ ,

check this one:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
 
print(col_list)

the output is:

[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]

 test:

from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
 
schema = StructType([ 
    StructField("id",StringType(),True), 
    StructField("elements",ArrayType(StringType()),True)
  ])
 
 
data = [
 ("1",["john","jack","doe"])
]
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
 
root
 |-- id: string (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = true)
 
+---+-----------------+
| id|         elements|
+---+-----------------+
|  1|[john, jack, doe]|
+---+-----------------+
 
df.select(*col_list).display()
 
 
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john     |jack      |doe     |
+---------+----------+--------+

View solution in original post

3 REPLIES 3

Pat
Honored Contributor III

Hi @Riyaz Aliโ€‹ ,

check this one:

desired_columns = ["firstName", "middleName", "lastName"]
 
col_size = len(desired_columns)
 
col_list = [col("elements").getItem(i).alias(desired_columns[i]) for i in range(col_size)]
 
print(col_list)

the output is:

[Column<'elements[0] AS firstName'>, Column<'elements[1] AS middleName'>, Column<'elements[2] AS lastName'>]

 test:

from pyspark.sql.types import StringType, ArrayType
arrayCol = ArrayType(StringType(),False)
 
schema = StructType([ 
    StructField("id",StringType(),True), 
    StructField("elements",ArrayType(StringType()),True)
  ])
 
 
data = [
 ("1",["john","jack","doe"])
]
 
df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()
 
root
 |-- id: string (nullable = true)
 |-- elements: array (nullable = true)
 |    |-- element: string (containsNull = true)
 
+---+-----------------+
| id|         elements|
+---+-----------------+
|  1|[john, jack, doe]|
+---+-----------------+
 
df.select(*col_list).display()
 
 
output:
+---------+----------+--------+
|firstName|middleName|lastName|
+---------+----------+--------+
|john     |jack      |doe     |
+---------+----------+--------+

RiyazAli
Valued Contributor

Thank you @Pat Sienkiewiczโ€‹ !

This makes a whole lotta sense! Not sure why I was selecting from a data frame when all I needed were columns.

RiyazAli
Valued Contributor

Another follow-up question, if you don't mind. @Pat Sienkiewiczโ€‹ 

As I was trying to parse the name column into multiple columns. I came across the data below:

("James,\"A,B\", Smith", "2018",  "M", 3000)

In order to parse these comma-included middle names, I was using the `from_csv` function.

The Scala Spark code looks like the below:

%scala
// using from_csv function with defined Schema to split the columns.
 
val options = Map("sep" -> ",")
 
val df_split = df.select($"*", F.from_csv($"name", simpleSchema, options).alias("value_parsed"))
 
val df_multi_cols = df_split.select("*", "value_parsed.*").drop("value_parsed")
 
df.show(false)
df_multi_cols.show(false)

The schema that's mentioned above is as follows:

%scala
// schema in scala
 
val simpleSchema = new StructType()
                    .add("firstName", StringType)
                    .add("middleName",StringType)
                    .add("lastName",StringType)

Now the code that I came up for PySpark is that:

#Schema in PySpark
simple_schema = (StructType()
                 .add('firstName', StringType())
                 .add('middleName', StringType())
                 .add('lastName', StringType())
                )
options = {'sep':','}
 
df_split = df_is.select("*", from_csv(df_is.name, simple_schema, options).alias("value_parsed"))
 
#df_split.printSchema()

This throws up an error: `TypeError: schema argument should be a column or string`

Now following the error, if I define the schema in the SQL style (in quotes), it works.

options = {'sep':','}
 
df_split = df_is.select("*", from_csv(df_is.name, "firstName string, middleName string, lastName string", options).alias("value_parsed"))
 
df_split.printSchema()

I'm intrigued as to why it works in Scala Spark and why not in PySpark. Any leads would be greatly appreciated.

Best,

Riz

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group