cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Getting Error "java.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException" while writing data to event hub for streaming. It is working fine if I am writing it to another data brick table

Rahul_Tiwary
New Contributor II

import org.apache.spark.sql._

import scala.collection.JavaConverters._

import com.microsoft.azure.eventhubs._

import java.util.concurrent._

import scala.collection.immutable._

import org.apache.spark.eventhubs._

import scala.concurrent.Future

import scala.concurrent.ExecutionContext.Implicits.global

import org.apache.spark.sql.streaming.{OutputMode, Trigger}

import scala.concurrent.duration._

var testConnectionstr = "Connection string"

val parameters = EventHubsConf(testConnectionstr).setMaxEventsPerTrigger(5)

val df = spark.readStream.format("delta").table("gold.redemption")

val ds = df

  .selectExpr("RedemptionId", "ProgramId","ClaimsPK_CL_FILEID")

  .writeStream

  .format("eventhubs")

  .options(parameters.toMap)

  .option("startingOffsets", "latest") 

  .option("checkpointLocation", "path/to/checkpoint/dir")

  .start()

Error Log --

ava.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V

at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery$2(EventHubsWriter.scala:53)

at scala.Option.getOrElse(Option.scala:189)

at org.apache.spark.sql.eventhubs.EventHubsWriter$.validateQuery(EventHubsWriter.scala:53)

at org.apache.spark.sql.eventhubs.EventHubsWriter$.write(EventHubsWriter.scala:70)

at org.apache.spark.sql.eventhubs.EventHubsSink.addBatch(EventHubsSink.scala:39)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:805)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)

at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)

at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)

at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)

at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)

at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:803)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:320)

at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:318)

at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:803)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$5(MicroBatchExecution.scala:339)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:904)

at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$2(MicroBatchExecution.scala:336)

at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)

  .options(parameters.toMap)

  .option("startingOffsets", "latest") 

  .option("checkpointLocation", "path/to/checkpoint/dir")

  .start()

2 REPLIES 2

Kaniz
Community Manager
Community Manager

Hi @Rahul Tiwary​, Can you re-check the path to the checkpoint directory in the actual code?

Gepap
New Contributor II

The dataframe to write needs to have the following schema:

Column                             |  Type
----------------------------------------------
body (required)               |  string or binary 
partitionId (*optional)     |  string 
partitionKey (*optional)  |  string

This worked for me (pyspark version):

df.withColumn('body', F.to_json(
       F.struct(*df.columns),
       options={"ignoreNullFields": False}))\
   .select('body')\
   .write\
   .format("eventhubs")\
   .options(**ehconf)\
   .save()

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.