07-25-2023 04:06 AM
Hello, I use spark 3.4.1-hadooop 3 on windows 11. And I am struggling to generate the schema of csv data with schema_of csv function. Below is my java codes.
Map<String, String> kafkaParams = new HashMap<>();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("subscribe", "topic_unempl_ann");
kafkaParams.put("startingOffsets", "earliest");
SparkSession spark = SparkSession.builder().master("local[*]").appName("kafka_test").getOrCreate();
Dataset<Row> df = spark.read().format("kafka")
.options(kafkaParams)
.load()
.selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));
Map<String, String> options = new HashMap<String, String>();
options.put("delimiter", ",");
options.put("header", "true");
options.put("inferSchema", "true");
String schemaString = "date,value,id,title,state,frequency_short,units_short,seasonal_adjustment_short";
Column schema = schema_of_csv(schemaString);
Dataset<Row> parsed = df.select(from_csv(col("column"), schema, options).as("entityStoredPojo"))
.selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id",
"entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short",
"entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();
But these codes throw errors like below,
Exception in thread "main" org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `date` in `_c0`, `_c1`, `_c2`, `_c3`, `_c4`, `_c5`, `_c6`, `_c7`.; line 1 pos 0
at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2115)
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:56)
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.$anonfun$resolve$3(package.scala:363)
at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
at scala.collection.immutable.List.foldLeft(List.scala:79)
at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:362)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:115)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$2(ColumnResolutionHelper.scala:378)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:158)
at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:165)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:136)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:195)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:385)
at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:358)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveExpressionByPlanChildren(Analyzer.scala:1504)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.$anonfun$applyOrElse$110(Analyzer.scala:1638)
at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1638)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1529)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1529)
at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1504)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
at scala.collection.immutable.List.foldLeft(List.scala:79)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:333)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4196)
at org.apache.spark.sql.Dataset.select(Dataset.scala:1578)
at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1612)
at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1611)
at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:53)
I think the schema is not generated successfully but by default column names like _c0, _c1, _c2.
Kindly inform me how to generate the schema correctly with the schema_of_csv function which parameter type is string.
07-26-2023 05:15 AM
@Retired_mod wrote:Hi @aupres, To generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:
scala
import org.apache.spark.sql.functions.schema_of_csv
val csvData = "1,abc"
val schema = schema_of_csv(csvData)This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.
Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html
Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.
07-26-2023 05:15 AM
@Retired_mod wrote:Hi @aupres, To generate schema with org.apache.spark.sql.functions.schema_of_csv, you can use the following syntax:
scala
import org.apache.spark.sql.functions.schema_of_csv
val csvData = "1,abc"
val schema = schema_of_csv(csvData)This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.
Sources: https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html
Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.
07-28-2023 01:30 AM
I use org.apache.spark.sql.functions.lit method and solve this issue. Thank you any way.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group