<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: How to generate schema with org.apache.spark.sql.functions.schema_of_csv? in Get Started Discussions</title>
    <link>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38639#M5547</link>
    <description>&lt;P&gt;I use org.apache.spark.sql.functions.lit method and solve this issue. Thank you any way.&lt;/P&gt;</description>
    <pubDate>Fri, 28 Jul 2023 08:30:46 GMT</pubDate>
    <dc:creator>aupres</dc:creator>
    <dc:date>2023-07-28T08:30:46Z</dc:date>
    <item>
      <title>How to generate schema with org.apache.spark.sql.functions.schema_of_csv?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38366#M5544</link>
      <description>&lt;P&gt;Hello, I use spark 3.4.1-hadooop 3 on windows 11. And I am struggling to generate the schema of csv data with schema_of csv function. Below is my java codes.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;Map&amp;lt;String, String&amp;gt; kafkaParams = new HashMap&amp;lt;&amp;gt;();
kafkaParams.put("kafka.bootstrap.servers", "localhost:9092");
kafkaParams.put("subscribe", "topic_unempl_ann");
kafkaParams.put("startingOffsets", "earliest");

SparkSession spark = SparkSession.builder().master("local[*]").appName("kafka_test").getOrCreate();
Dataset&amp;lt;Row&amp;gt; df = spark.read().format("kafka")
            .options(kafkaParams)
            .load()
            .selectExpr("CAST(value AS STRING) as column").filter(not(col("column").startsWith("date")));

Map&amp;lt;String, String&amp;gt; options = new HashMap&amp;lt;String, String&amp;gt;();
options.put("delimiter", ",");
options.put("header", "true");
options.put("inferSchema", "true");
         
String schemaString = "date,value,id,title,state,frequency_short,units_short,seasonal_adjustment_short";

Column schema = schema_of_csv(schemaString);

Dataset&amp;lt;Row&amp;gt; parsed = df.select(from_csv(col("column"), schema, options).as("entityStoredPojo"))
                 .selectExpr("entityStoredPojo.date", "entityStoredPojo.value", "entityStoredPojo.id", 
                        "entityStoredPojo.title", "entityStoredPojo.state", "entityStoredPojo.frequency_short", 
                        "entityStoredPojo.units_short", "entityStoredPojo.seasonal_adjustment_short").toDF();&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;But these codes throw errors like below,&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;LI-CODE lang="java"&gt;Exception in thread "main" org.apache.spark.sql.AnalysisException: [FIELD_NOT_FOUND] No such struct field `date` in `_c0`, `_c1`, `_c2`, `_c3`, `_c4`, `_c5`, `_c6`, `_c7`.; line 1 pos 0
        at org.apache.spark.sql.errors.QueryCompilationErrors$.noSuchStructFieldInGivenFieldsError(QueryCompilationErrors.scala:2115)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.findField(complexTypeExtractors.scala:83)
        at org.apache.spark.sql.catalyst.expressions.ExtractValue$.apply(complexTypeExtractors.scala:56)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.$anonfun$resolve$3(package.scala:363)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.expressions.package$AttributeSeq.resolve(package.scala:362)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:115)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpressionByPlanChildren$2(ColumnResolutionHelper.scala:378)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$3(ColumnResolutionHelper.scala:158)
        at org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:100)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.$anonfun$resolveExpression$1(ColumnResolutionHelper.scala:165)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.innerResolve$1(ColumnResolutionHelper.scala:136)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpression(ColumnResolutionHelper.scala:195)
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren(ColumnResolutionHelper.scala:385)      
        at org.apache.spark.sql.catalyst.analysis.ColumnResolutionHelper.resolveExpressionByPlanChildren$(ColumnResolutionHelper.scala:358)     
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.resolveExpressionByPlanChildren(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.$anonfun$applyOrElse$110(Analyzer.scala:1638)    
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:75)
        at scala.collection.immutable.ArraySeq.map(ArraySeq.scala:35)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1638)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$16.applyOrElse(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)        
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:31)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1529)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:1504)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
        at scala.collection.LinearSeqOps.foldLeft(LinearSeq.scala:169)
        at scala.collection.LinearSeqOps.foldLeft$(LinearSeq.scala:165)
        at scala.collection.immutable.List.foldLeft(List.scala:79)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
        at scala.collection.immutable.List.foreach(List.scala:333)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:228)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:224)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:88)
        at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
        at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:76)
        at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:111)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:526)
        at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:202)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:201)
        at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:76)
        at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$1(Dataset.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:88)
        at org.apache.spark.sql.Dataset.withPlan(Dataset.scala:4196)
        at org.apache.spark.sql.Dataset.select(Dataset.scala:1578)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1612)
        at org.apache.spark.sql.Dataset.selectExpr(Dataset.scala:1611)
        at com.aaa.etl.processor.Test_CSV.main(Test_CSV.java:53)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;I think the schema is not generated successfully but by default column names like _c0, _c1, _c2.&lt;/P&gt;&lt;P&gt;Kindly inform me how to generate the schema correctly with the schema_of_csv function which parameter type is string.&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Tue, 25 Jul 2023 11:06:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38366#M5544</guid>
      <dc:creator>aupres</dc:creator>
      <dc:date>2023-07-25T11:06:54Z</dc:date>
    </item>
    <item>
      <title>Re: How to generate schema with org.apache.spark.sql.functions.schema_of_csv?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38489#M5546</link>
      <description>&lt;BLOCKQUOTE&gt;&lt;HR /&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;&amp;nbsp;wrote:&lt;BR /&gt;&lt;P&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/85281"&gt;@aupres&lt;/a&gt;,&amp;nbsp;&lt;SPAN&gt;To generate schema with&amp;nbsp;&lt;/SPAN&gt;org.apache.spark.sql.functions.schema_of_csv&lt;SPAN&gt;, you can use the following syntax:&lt;/SPAN&gt;&lt;/P&gt;&lt;PRE&gt;scala&lt;BR /&gt;import org.apache.spark.sql.functions.schema_of_csv&lt;BR /&gt;&lt;BR /&gt;val csvData = "1,abc"&lt;BR /&gt;val schema = schema_of_csv(csvData)&lt;/PRE&gt;&lt;P&gt;&lt;SPAN&gt;This will generate a schema for the CSV data in DDL format. You can then use this schema to define the schema of a DataFrame or a table.&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;Sources:&amp;nbsp;&lt;/SPAN&gt;&lt;A href="https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html" target="_blank" rel="noopener noreferrer"&gt;https://docs.databricks.com/sql/language-manual/functions/schema_of_csv.html&lt;/A&gt;&lt;/P&gt;&lt;HR /&gt;&lt;/BLOCKQUOTE&gt;&lt;P&gt;Thanks for your reply. I am afraid your codes are scala sources. Kindly inform me of java-written examples. The schema_of_csv function still throws the exception with the above schema string.&lt;/P&gt;</description>
      <pubDate>Wed, 26 Jul 2023 12:15:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38489#M5546</guid>
      <dc:creator>aupres</dc:creator>
      <dc:date>2023-07-26T12:15:01Z</dc:date>
    </item>
    <item>
      <title>Re: How to generate schema with org.apache.spark.sql.functions.schema_of_csv?</title>
      <link>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38639#M5547</link>
      <description>&lt;P&gt;I use org.apache.spark.sql.functions.lit method and solve this issue. Thank you any way.&lt;/P&gt;</description>
      <pubDate>Fri, 28 Jul 2023 08:30:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/get-started-discussions/how-to-generate-schema-with-org-apache-spark-sql-functions/m-p/38639#M5547</guid>
      <dc:creator>aupres</dc:creator>
      <dc:date>2023-07-28T08:30:46Z</dc:date>
    </item>
  </channel>
</rss>

