cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

how to read schema from text file stored in cloud storage

saninanda
New Contributor II

I have file a.csv or a.parquet while creating data frame reading we can explictly define schema with struct type. instead of write the schema in the notebook want to create schema lets say for all my csv i have one schema like csv_schema and stored in cloud storage. if any addition or deletion i will do that in csv_schema file separately.

In notebook when creating data frame during reading file want to pass this schema which stored in separate file .Please suggest if we can write any function in python or other idea to automate schema creation and addition in data frame for different file system

So its like, i have schema file a.schema where i have the schema definition for all the parquet file/any file its generic. kind of like

a.schema text file contains below details

schema1=StructType([StructField("x1", StringType(), True),StructField("Name", StringType(), True),StructField("PRICE", DoubleType(), True)])

read the a.schema from storage in notebook create the required schema which need to pass to dataframe.

df=spark.read.schema(generic schema).parquet ..

7 REPLIES 7

shyam_9
Valued Contributor
Valued Contributor

Hi @sani nanda,

Please follow the below steps,

  • Read the schema file as a CSV, setting header to true. This will give an empty dataframe but with the correct header.
  • Extract the column names from that schema file.
column_names = spark.read.option("header",true).csv(schemafile).columns
  • Now read the datafile and change the default column names to the ones in the schema dataframe.
df = spark.read.option("header", "false").option("inferSchema", "true").csv(datafile).toDF(column_names: _*)

saninanda
New Contributor II

Hi @shyamspr

Thanks for sharing the answer. Actually i have tried with the approach like taking the header from raw filename and then adding those again.But the issue is want to explicitly change the data type as required without reading for inferring schema .

is there any way can add data type along with column name programmatically like in case where we have 30+ columns.

that's why trying to have schema file separately where we have column name and data type in list or structfiled.

shyam_9
Valued Contributor
Valued Contributor

You can also do this by using selectExpr

df2 = df.selectExpr("CAST (`col1` as string) as `col1`") 

and also one more method as in the below image

0693f000007OrmtAAC

saninanda
New Contributor II

@shyamspr,Thanks for looking but as said both scenarios like we need to add the column data type explicitly with in notebook, consider 30 to 40+ columns .Its not reusable each notebook need to add the schema .

As you updated say like the custom schema structure, am storing that in one file custom_schema.txt .was trying to apply that schema from that file custom_schema.txt ,where we have the Struct type and fields defined, during data read from the file path and dataframe creation

but not able to make it. So was thinking if any python function or any utility can do help to form the reusable code like read the schema definition from file and during data frame we can use same.

shaunryan
New Contributor II

@sunil nanda

I wrote this code below read schema from my git controlled schema library on Azure storage. it converts a spark json schema in a file to a spark schem

This is an example of a schema with 1 column:

{
    "type": "struct",
    "fields": [
        {
            "name": "Data",
            "type": "string",
            "nullable": true,
            "metadata": {}
        }
    ]
}

Don't type this out. You can get it initially during development by loading the data inferred, copy it out into a file, pretty print it and then review it and tweak to the desired schema, put it in VS code git project and deploy it to the lake.

To get the schema out of an inferred df do this:

df.schema.json

Code to read the schema from the attached lake storage. The path can be azure or s3 storage protocol paths but you have to mount it first in the session or use passthrough... don't use global mounts, that's really bad security practice and incredibly lazy!!

def loadSparkSchema(path:String) =
{
if (path==null || path.isEmpty()) null:StructType
try
{
var file = ""
val filedf = spark.read
.option("multiLine", true)
.option("mode", "PERMISSIVE")
.option("delimiter", "\n")
.csv(path)
for(line <- filedf.collect())
{
file = file + line(0)
}
Try(DataType.fromJson(file).asInstanceOf[StructType])
match {
case Success(s) => s
case Failure(f) => {
logError(s"Failed to parse spark schema from json file at ${path}\n ${f}")
}
}
}
catch
{
case e: Exception =>
logError(s"Failed to parse spark schema from json file at ${path}\n ${e.getMessage()}")
}
}

narahari_1994
New Contributor II

Python Function:

def read_schema(arg):

d_types = {

"varchar":StringType(),

"integer":IntegerType(),

"timestamp":TimestampType(),

"double":DoubleType(),

"date":DateType(),

"decimal":DecimalType()

}

split_values= arg.split(",")

sch= StructType()

for i in split_values:

x=i.split("|")

sch.add(x[0],d_types[x[1]],True)

return sch

Sample schema Input file:

id|integer,

cust_nr|varchar,

bus_name|varchar,

bus_tym|varchar,

bus_desciprion|varchar

The code need to be implemented in the pyspark job:

textRDD1 = sc.textFile("Schema_file.txt")

llist = textRDD1.collect()

listToStr = ''.join([str(elem) for elem in llist])

sch = read_schema(listToStr)

df= spark.read.csv(path='Table_PATH',schema=sch, header=False, sep='|')

df.show()

Nakeman
New Contributor II

@shyampsr big thanks, was searching for the solution almost 3 hours 😞

_

https://luckycanadian.com/

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.