import org.apache.spark.sql._
import scala.collection.JavaConverters._
import com.microsoft.azure.eventhubs._
import java.util.concurrent._
import scala.collection.immutable._
import org.apache.spark.eventhubs._
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import org.apache.spark.sql.streaming.{OutputMode, Trigger}
import scala.concurrent.duration._
var testConnectionstr = "Connection string"
val parameters = EventHubsConf(testConnectionstr).setMaxEventsPerTrigger(5)
val df = spark.readStream.format("delta").table("gold.redemption")
val ds = df
.selectExpr("RedemptionId", "ProgramId","ClaimsPK_CL_FILEID")
.writeStream
.format("eventhubs")
.options(parameters.toMap)
.option("startingOffsets", "latest")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()
Error Log --
ava.lang.NoSuchMethodError: org.apache.spark.sql.AnalysisException.<init>(Ljava/lang/String;Lscala/Option;Lscala/Option;Lscala/Option;Lscala/Option;)V
at org.apache.spark.sql.eventhubs.EventHubsWriter$.$anonfun$validateQuery$2(EventHubsWriter.scala:53)
at scala.Option.getOrElse(Option.scala:189)
at org.apache.spark.sql.eventhubs.EventHubsWriter$.validateQuery(EventHubsWriter.scala:53)
at org.apache.spark.sql.eventhubs.EventHubsWriter$.write(EventHubsWriter.scala:70)
at org.apache.spark.sql.eventhubs.EventHubsSink.addBatch(EventHubsSink.scala:39)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$17(MicroBatchExecution.scala:805)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$8(SQLExecution.scala:240)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:388)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:187)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:973)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:142)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:338)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$16(MicroBatchExecution.scala:803)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:320)
at org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:318)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:73)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:803)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$5(MicroBatchExecution.scala:339)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.withSchemaEvolution(MicroBatchExecution.scala:904)
at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStreamWithListener$2(MicroBatchExecution.scala:336)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
.options(parameters.toMap)
.option("startingOffsets", "latest")
.option("checkpointLocation", "path/to/checkpoint/dir")
.start()