cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to generate schema with org.apache.spark.sql.functions.schema_of_csv?

aupres
New Contributor III

Hello, I use spark 3.4.1-hadooop 3 on windows 11. And I am struggling to generate the schema of csv data with schema_of csv function. Below is my java codes.

 

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("subscribe", "topic_unempl_ann");
kafkaParams.put("startingOffsets", "earliest");

SparkSession spark = SparkSession.builder().master("local[*]").appName("kafka_test").getOrCreate();
Dataset<Row> df = spark.read().format("kafka")
            .options(kafkaParams)
            .load()
            .selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));

Map<String, String> options = new HashMap<String, String>();
options.put("delimiter", ",");
options.put("header", "true");
options.put("inferSchema", "true");
         
String schemaString = "date,value,id,title,state,frequency_short,units_short,seasonal_adjustment_short";

Column schema = schema_of_csv(schemaString);

Dataset<Row> parsed = df.select(from_csv(col("column"), schema, options).as("entityStoredPojo"))
                 .selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id", 
                        "entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short", 
                        "entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();

 

But these codes throw errors like below,

 

Exception in thread "main" org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `date` in `_c0`, `_c1`, `_c2`, `_c3`, `_c4`, `_c5`, `_c6`, `_c7`.; line 1 pos 0
        at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2115)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:56)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.$anonfun$resolve$3(package.scala:363)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:362)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:115)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$2(ColumnResolutionHelper.scala:378)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:158)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:165)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:136)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:195)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:385)      
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:358)     
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveExpressionByPlanChildren(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.$anonfun$applyOrElse$110(Analyzer.scala:1638)    
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1638)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4196)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1578)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1612)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1611)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:53)

 

I think the schema is not generated successfully but by default column names like _c0, _c1, _c2.

Kindly inform me how to generate the schema correctly with the schema_of_csv function which parameter type is string.

 

1 ACCEPTED SOLUTION

Accepted Solutions

aupres
New Contributor III

@Retired_mod wrote:

Hi @aupresTo generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

View solution in original post

2 REPLIES 2

aupres
New Contributor III

@Retired_mod wrote:

Hi @aupresTo generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

aupres
New Contributor III

I use org.apache.spark.sql.functions.lit method and solve this issue. Thank you any way.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now