i want to read and load the data to eventhub. And there is an error message:org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 138.0 failed 4 times, most recent failure: Lost task 0.3 in stage 138.0 (TID 177) (10.139.64.4 executor driver): java.util.concurrent.CompletionException: com.microsoft.azure.eventhubs.OperationCancelledException: Entity(anomalydetection-eventhub): send failed while dispatching to Reactor, see cause for more details.
Below is my code
from pyspark.sql.functions import to_json, struct
silver_data = spark.readStream.format("delta").table("anomaydetection_processed.silver_clean_data")
raw_connection_string = dbutils.secrets.get(scope="anomalydetect-scope", key="connection-string-eventhub-send")
ehConf = {}
ehConf['eventhubs.connectionString'] = spark._sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(raw_connection_string)
output_df = silver_data.select(to_json(struct("*")).alias("body"))
output_df.writeStream \
.format("eventhubs") \
.options(**ehConf) \
.option("checkpointLocation", "/mnt/anomalydetection2/presentation/eventhub_checkpoint") \
.start()
khang