cancel
Showing results for 
Search instead for 
Did you mean: 
Community Platform Discussions
Connect with fellow community members to discuss general topics related to the Databricks platform, industry trends, and best practices. Share experiences, ask questions, and foster collaboration within the community.
cancel
Showing results for 
Search instead for 
Did you mean: 

How to generate schema with org.apache.spark.sql.functions.schema_of_csv?

aupres
New Contributor III

Hello, I use spark 3.4.1-hadooop 3 on windows 11. And I am struggling to generate the schema of csv data with schema_of csv function. Below is my java codes.

 

Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("subscribe", "topic_unempl_ann");
kafkaParams.put("startingOffsets", "earliest");

SparkSession spark = SparkSession.builder().master("local[*]").appName("kafka_test").getOrCreate();
Dataset<Row> df = spark.read().format("kafka")
            .options(kafkaParams)
            .load()
            .selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));

Map<String, String> options = new HashMap<String, String>();
options.put("delimiter", ",");
options.put("header", "true");
options.put("inferSchema", "true");
         
String schemaString = "date,value,id,title,state,frequency_short,units_short,seasonal_adjustment_short";

Column schema = schema_of_csv(schemaString);

Dataset<Row> parsed = df.select(from_csv(col("column"), schema, options).as("entityStoredPojo"))
                 .selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id", 
                        "entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short", 
                        "entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();

 

But these codes throw errors like below,

 

Exception in thread "main" org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `date` in `_c0`, `_c1`, `_c2`, `_c3`, `_c4`, `_c5`, `_c6`, `_c7`.; line 1 pos 0
        at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2115)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:56)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.$anonfun$resolve$3(package.scala:363)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:362)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:115)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$2(ColumnResolutionHelper.scala:378)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:158)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:165)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:136)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:195)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:385)      
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:358)     
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveExpressionByPlanChildren(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.$anonfun$applyOrElse$110(Analyzer.scala:1638)    
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1638)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4196)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1578)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1612)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1611)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:53)

 

I think the schema is not generated successfully but by default column names like _c0, _c1, _c2.

Kindly inform me how to generate the schema correctly with the schema_of_csv function which parameter type is string.

 

1 ACCEPTED SOLUTION

Accepted Solutions

aupres
New Contributor III

@Retired_mod wrote:

Hi @aupresTo generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

View solution in original post

2 REPLIES 2

aupres
New Contributor III

@Retired_mod wrote:

Hi @aupresTo generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:

scala
import org.apache.spark.sql.functions.schema_of_csv

val csvData = "1,abc"
val schema = schema_of_csv(csvData)

This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.

Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html


Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.

aupres
New Contributor III

I use org.apache.spark.sql.functions.lit method and solve this issue. Thank you any way.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group