cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

How to Change Schema of a Spark SQL

Dee
New Contributor

I am new to Spark and just started an online pyspark tutorial. I uploaded the json data in DataBrick and wrote the commands as follows:

df = sqlContext.sql("SELECT * FROM people_json")

df.printSchema()

from pyspark.sql.types import *

data_schema = [StructField('age',IntegerType(),True), StructField('name',StringType(),True)]

final_struc = StructType(fields=data_schema)

###Tutorial says to run this command

df = spark.read.json('people_json',schema=final_struc)

###But this is not working. Why this is not working ? And what will work ? Thanks!

1 ACCEPTED SOLUTION

Accepted Solutions

dennyglee
New Contributor III
New Contributor III

The first part of your query

df = sqlContext.sql("SELECT * FROM people_json")
df.printSchema()

is create the

df
DataFrame by reading an existing table.

The second part of your query is using

spark.read.json
which is expecting a file. For example, the following code does work:

from pyspark.sql.types import *
data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)]
final_struc = StructType(fields=data_schema)
df = spark.read.json("/my/directory/people.json", schema=final_struc)
df.show() 

with the output being:

 +----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

If you would like to change the schema of the table based on your first query, you can

1. Execute Spark SQL such as

df.createOrReplaceTempView("df")
df2 = spark.sql("select cast(age as int) as age, cast(name as string) as name from df")

2. Use PySpark DataFrame to cast the column/schemas

from pyspark.sql.types import IntegerType
df2 = df.withColumn("age", df["age"].cast(IntegerType()))

HTH!

View solution in original post

2 REPLIES 2

dennyglee
New Contributor III
New Contributor III

The first part of your query

df = sqlContext.sql("SELECT * FROM people_json")
df.printSchema()

is create the

df
DataFrame by reading an existing table.

The second part of your query is using

spark.read.json
which is expecting a file. For example, the following code does work:

from pyspark.sql.types import *
data_schema = [StructField('age', IntegerType(), True), StructField('name', StringType(), True)]
final_struc = StructType(fields=data_schema)
df = spark.read.json("/my/directory/people.json", schema=final_struc)
df.show() 

with the output being:

 +----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+

If you would like to change the schema of the table based on your first query, you can

1. Execute Spark SQL such as

df.createOrReplaceTempView("df")
df2 = spark.sql("select cast(age as int) as age, cast(name as string) as name from df")

2. Use PySpark DataFrame to cast the column/schemas

from pyspark.sql.types import IntegerType
df2 = df.withColumn("age", df["age"].cast(IntegerType()))

HTH!

bhanu2448
New Contributor II
Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.