<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Streaming data from delta table to eventhub after merging data - getting timeout error!! in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39443#M26981</link>
    <description>&lt;P&gt;and also try with &lt;SPAN class=""&gt;maxBytesPerTrigger&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Wed, 09 Aug 2023 12:57:13 GMT</pubDate>
    <dc:creator>-werners-</dc:creator>
    <dc:date>2023-08-09T12:57:13Z</dc:date>
    <item>
      <title>Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39404#M26964</link>
      <description>&lt;P&gt;Here is my code to write data from a delta table to event hub (from where consumer team will consume data):&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.eventhubs._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.sql.streaming.&lt;SPAN class=""&gt;Trigger&lt;/SPAN&gt;._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.sql.types._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.sql.functions._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; java.util.&lt;SPAN class=""&gt;Properties&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; com.microsoft.azure.eventhubs.{ &lt;SPAN class=""&gt;EventData&lt;/SPAN&gt;, &lt;SPAN class=""&gt;PartitionSender&lt;/SPAN&gt; }  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.eventhubs.&lt;SPAN class=""&gt;EventHubsConf&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; io.delta.tables._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; org.apache.spark.sql.streaming.&lt;SPAN class=""&gt;Trigger&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; java.io.&lt;SPAN class=""&gt;PrintWriter&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; java.time.&lt;SPAN class=""&gt;ZonedDateTime&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; java.time.format.&lt;SPAN class=""&gt;DateTimeFormatter&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; scala.concurrent.duration._  
    &lt;SPAN class=""&gt;import&lt;/SPAN&gt; java.nio.file.{&lt;SPAN class=""&gt;Paths&lt;/SPAN&gt;, &lt;SPAN class=""&gt;Files&lt;/SPAN&gt;}    

    &lt;SPAN class=""&gt;// Configure Azure Event Hub details  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; namespaceNameOut = &lt;SPAN class=""&gt;"des-ent-prod-cus-stream-eventhub-001"&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; eventHubNameOut = &lt;SPAN class=""&gt;"hvc-prodstats-output"&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; sasKeyNameOut = &lt;SPAN class=""&gt;"writer"&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; sasKeyOut = dbutils.secrets.get(scope=&lt;SPAN class=""&gt;"deskvscope"&lt;/SPAN&gt;, key=&lt;SPAN class=""&gt;"des-ent-prod-cus-stream-e venthub-001-writer"&lt;/SPAN&gt;)  

    &lt;SPAN class=""&gt;// Configure checkpoint and bad data paths&lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; checkpoint_dir_path = &lt;SPAN class=""&gt;"/mnt/hvc-wenco/prodstats/stream/checkpoints"&lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; baddata_path = &lt;SPAN class=""&gt;"/mnt/hvc-wenco/prodstats/stream/Bad_data"&lt;/SPAN&gt;

    &lt;SPAN class=""&gt;// Define timestamp checkpoint path  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; tbl_version_timestamp_path = &lt;SPAN class=""&gt;"/mnt/hvc-  wenco/equip_status_history_table/checkpoints/checkpoint"&lt;/SPAN&gt;  

    &lt;SPAN class=""&gt;// Configure other parameters  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; &lt;SPAN class=""&gt;MaxEvents&lt;/SPAN&gt; = &lt;SPAN class=""&gt;5000&lt;/SPAN&gt;  

    &lt;SPAN class=""&gt;// Read the last checkpoint timestamp  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; last_checkpoint_string = dbutils.fs.head(tbl_version_timestamp_path)  

    &lt;SPAN class=""&gt;// Parse last checkpoint timestamp  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; time_format = &lt;SPAN class=""&gt;"yyyy-MM-dd HH:mm:ss.SSSz"&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; formatter = &lt;SPAN class=""&gt;DateTimeFormatter&lt;/SPAN&gt;.ofPattern(time_format)  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; last_checkpoint = &lt;SPAN class=""&gt;ZonedDateTime&lt;/SPAN&gt;.parse(last_checkpoint_string, formatter)  

    &lt;SPAN class=""&gt;// Build connection to Event Hub  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; connStrOut = &lt;SPAN class=""&gt;new&lt;/SPAN&gt; com.microsoft.azure.eventhubs.&lt;SPAN class=""&gt;ConnectionStringBuilder&lt;/SPAN&gt;()  
            .setNamespaceName(namespaceNameOut)  
            .setEventHubName(eventHubNameOut)  
            .setSasKeyName(sasKeyNameOut)  
            .setSasKey(sasKeyOut)  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; ehWriteConf = &lt;SPAN class=""&gt;EventHubsConf&lt;/SPAN&gt;(connStrOut.toString())  

    &lt;SPAN class=""&gt;// Create a streaming dataframe from the Delta table  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; &lt;SPAN class=""&gt;InputStreamingDF&lt;/SPAN&gt; =   
      spark  
        .readStream  
        .option(&lt;SPAN class=""&gt;"maxFilesPerTrigger"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;1&lt;/SPAN&gt;)  
        .option(&lt;SPAN class=""&gt;"startingTimestamp"&lt;/SPAN&gt;, last_checkpoint_string)  
        .option(&lt;SPAN class=""&gt;"readChangeFeed"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;"true"&lt;/SPAN&gt;)
        .table(&lt;SPAN class=""&gt;"wencohvc.equip_status_history_table"&lt;/SPAN&gt;)  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; dropPreTransform = &lt;SPAN class=""&gt;InputStreamingDF&lt;/SPAN&gt;.filter(&lt;SPAN class=""&gt;InputStreamingDF&lt;/SPAN&gt;(&lt;SPAN class=""&gt;"_change_type"&lt;/SPAN&gt;) =!= &lt;SPAN class=""&gt;"update_preimage"&lt;/SPAN&gt;)  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; operationTransform = dropPreTransform.withColumn(&lt;SPAN class=""&gt;"operation"&lt;/SPAN&gt;, when($&lt;SPAN class=""&gt;"_change_type"&lt;/SPAN&gt; === &lt;SPAN class=""&gt;"insert"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;2&lt;/SPAN&gt;).otherwise(when($&lt;SPAN class=""&gt;"_change_type"&lt;/SPAN&gt; === &lt;SPAN class=""&gt;"update_postimage"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;4&lt;/SPAN&gt;)))  

    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; transformedDF = operationTransform.withColumn(&lt;SPAN class=""&gt;"DeletedIndicator"&lt;/SPAN&gt;, when($&lt;SPAN class=""&gt;"_change_type"&lt;/SPAN&gt; === &lt;SPAN class=""&gt;"delete"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;"Y"&lt;/SPAN&gt;).otherwise(&lt;SPAN class=""&gt;"N"&lt;/SPAN&gt;))  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; finalDF = transformedDF.drop(&lt;SPAN class=""&gt;"_change_type"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;"_commit_version"&lt;/SPAN&gt;, &lt;SPAN class=""&gt;"_commit_timestamp"&lt;/SPAN&gt;)  

    &lt;SPAN class=""&gt;// Write to Event Hubs with retry and checkpointing  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;var&lt;/SPAN&gt; retry = &lt;SPAN class=""&gt;true&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;var&lt;/SPAN&gt; retryCount = &lt;SPAN class=""&gt;0&lt;/SPAN&gt;  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; maxRetries = &lt;SPAN class=""&gt;3&lt;/SPAN&gt;  

    &lt;SPAN class=""&gt;while&lt;/SPAN&gt; (retry &amp;amp;&amp;amp; retryCount &amp;lt; maxRetries) {  
      &lt;SPAN class=""&gt;try&lt;/SPAN&gt; {  
        &lt;SPAN class=""&gt;val&lt;/SPAN&gt; stream = finalDF  
          .select(to_json(struct(&lt;SPAN class=""&gt;/* column list */&lt;/SPAN&gt;)).alias(&lt;SPAN class=""&gt;"body"&lt;/SPAN&gt;))  
          .writeStream  
          .format(&lt;SPAN class=""&gt;"eventhubs"&lt;/SPAN&gt;)  
          .options(ehWriteConf.toMap)  
          .option(&lt;SPAN class=""&gt;"checkpointLocation"&lt;/SPAN&gt;, checkpoint_dir_path)  
          .trigger(&lt;SPAN class=""&gt;Trigger&lt;/SPAN&gt;.&lt;SPAN class=""&gt;AvailableNow&lt;/SPAN&gt;)  
          .start()  

        stream.awaitTermination()  
        retry = &lt;SPAN class=""&gt;false&lt;/SPAN&gt;  
      } &lt;SPAN class=""&gt;catch&lt;/SPAN&gt; {  
        &lt;SPAN class=""&gt;case&lt;/SPAN&gt; e: &lt;SPAN class=""&gt;Exception&lt;/SPAN&gt; =&amp;gt;  
          retryCount += &lt;SPAN class=""&gt;1&lt;/SPAN&gt;  
          &lt;SPAN class=""&gt;if&lt;/SPAN&gt; (retryCount &amp;lt; maxRetries) {  
            &lt;SPAN class=""&gt;val&lt;/SPAN&gt; delay = &lt;SPAN class=""&gt;2.&lt;/SPAN&gt;seconds * retryCount  
            println(&lt;SPAN class=""&gt;s"Stream attempt &lt;SPAN class=""&gt;$retryCount&lt;/SPAN&gt; failed, retrying in &lt;SPAN class=""&gt;${delay.toSeconds}&lt;/SPAN&gt; seconds..."&lt;/SPAN&gt;)  
            &lt;SPAN class=""&gt;Thread&lt;/SPAN&gt;.sleep(delay.toMillis)  
          }  
      }  
    }  

    &lt;SPAN class=""&gt;// Write checkpoint  &lt;/SPAN&gt;
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; emptyDF = &lt;SPAN class=""&gt;Seq&lt;/SPAN&gt;((&lt;SPAN class=""&gt;1&lt;/SPAN&gt;)).toDF(&lt;SPAN class=""&gt;"seq"&lt;/SPAN&gt;)  
    &lt;SPAN class=""&gt;val&lt;/SPAN&gt; checkpoint_timestamp = emptyDF.withColumn(&lt;SPAN class=""&gt;"current_timestamp"&lt;/SPAN&gt;, current_timestamp()).first().getTimestamp(&lt;SPAN class=""&gt;1&lt;/SPAN&gt;) + &lt;SPAN class=""&gt;"+00:00"&lt;/SPAN&gt;  
    dbutils.fs.put(tbl_version_timestamp_path, checkpoint_timestamp.toString(), &lt;SPAN class=""&gt;true&lt;/SPAN&gt;)   &lt;/PRE&gt;&lt;P&gt;Problem is that it is timing out before the last command and last checkpoint command does not run. I have tried retrying mechanism as well, but still, it timeouts. the data volume is huge from the source, I don't want to stream duplicate data by running the notebook again and again, if proper checkpoint is not happening. how do i solve this issue?? I expect the job to run and store last checkpoint timestamp correctly, so it picks up form there in the next run, but it timeouts before the last command. . The error I am getting is:&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;ERROR&lt;/SPAN&gt;: &lt;SPAN class=""&gt;Some&lt;/SPAN&gt; streams terminated before &lt;SPAN class=""&gt;this&lt;/SPAN&gt; command could finish!
&lt;SPAN class=""&gt;Stream&lt;/SPAN&gt; attempt &lt;SPAN class=""&gt;1&lt;/SPAN&gt; failed, retrying in &lt;SPAN class=""&gt;2&lt;/SPAN&gt; seconds...
&lt;SPAN class=""&gt;Stream&lt;/SPAN&gt; attempt &lt;SPAN class=""&gt;2&lt;/SPAN&gt; failed, retrying in &lt;SPAN class=""&gt;4&lt;/SPAN&gt; seconds...
retry: &lt;SPAN class=""&gt;Boolean&lt;/SPAN&gt; = &lt;SPAN class=""&gt;true&lt;/SPAN&gt;
retryCount: &lt;SPAN class=""&gt;Int&lt;/SPAN&gt; = &lt;SPAN class=""&gt;3&lt;/SPAN&gt;
maxRetries: &lt;SPAN class=""&gt;Int&lt;/SPAN&gt; = &lt;SPAN class=""&gt;3&lt;/SPAN&gt;
&lt;SPAN class=""&gt;ERROR&lt;/SPAN&gt;: &lt;SPAN class=""&gt;Some&lt;/SPAN&gt; streams terminated before &lt;SPAN class=""&gt;this&lt;/SPAN&gt; command could finish!
&lt;SPAN class=""&gt;Command&lt;/SPAN&gt; took &lt;SPAN class=""&gt;0.04&lt;/SPAN&gt; seconds&lt;/PRE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 01:41:34 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39404#M26964</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T01:41:34Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39405#M26965</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/771"&gt;@Ryan_Chynoweth&lt;/a&gt;&amp;nbsp;Can you help?&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 01:42:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39405#M26965</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T01:42:06Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39406#M26966</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/7691"&gt;@hubert_dudek&lt;/a&gt;&amp;nbsp;Can you help?&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 01:44:01 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39406#M26966</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T01:44:01Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39407#M26967</link>
      <description>&lt;P&gt;@Anonymous&amp;nbsp;can you help?&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 01:45:37 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39407#M26967</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T01:45:37Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39414#M26968</link>
      <description>&lt;P&gt;My first reaction is: why the loop?&lt;BR /&gt;The streaming query should send all data to the sink.&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 07:09:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39414#M26968</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2023-08-09T07:09:25Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39440#M26979</link>
      <description>&lt;P&gt;yea initially there was no loop. I had to add it for retry mechanism so it does not times out before finishing the write operation.&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 12:48:11 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39440#M26979</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T12:48:11Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39442#M26980</link>
      <description>&lt;P&gt;well get rid of it and let's try to find the cause.&lt;/P&gt;&lt;P&gt;Can you set the maxfilespertrigger to the default (1000)?&lt;/P&gt;&lt;P&gt;Also check the points that Kaniz mentioned.&lt;/P&gt;&lt;P&gt;My feeling is that too much data is processed at once.&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 12:53:52 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39442#M26980</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2023-08-09T12:53:52Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39443#M26981</link>
      <description>&lt;P&gt;and also try with &lt;SPAN class=""&gt;maxBytesPerTrigger&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 12:57:13 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39443#M26981</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2023-08-09T12:57:13Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39462#M26990</link>
      <description>&lt;P&gt;&lt;SPAN&gt;Job aborted due to stage failure: Task 0 in stage 24.0 failed 4 times, most recent failure: Lost task 0.3 in stage 24.0 (TID 44) (10.188.133.16 executor 0): com.microsoft.azure.eventhubs.TimeoutException: Entity(qb2-testprodstats-output): Send operation timed out at 2023-08-09T16:42:30.177Z[Etc/UTC]., errorContext[NS: des-ent-prod-cus-stream-eventhub-001.servicebus.windows.net, PATH: qb2-testprodstats-output, REFERENCE_ID: 479AC68402153E0B02153E2864D3C08F_G2, LINK_CREDIT: 0]&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.MessageSender.throwSenderTimeout(MessageSender.java:947)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.MessageSender.access$1800(MessageSender.java:62)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.MessageSender$SendTimeout.run(MessageSender.java:1069)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.Timer$ScheduledTask.onEvent(Timer.java:48)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.DispatchHandler.onTimerTask(DispatchHandler.java:12)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at org.apache.qpid.proton.engine.BaseHandler.handle(BaseHandler.java:233)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at org.apache.qpid.proton.engine.impl.EventImpl.dispatch(EventImpl.java:108)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at org.apache.qpid.proton.reactor.impl.ReactorImpl.dispatch(ReactorImpl.java:324)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at org.apache.qpid.proton.reactor.impl.ReactorImpl.process(ReactorImpl.java:291)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at com.microsoft.azure.eventhubs.impl.MessagingFactory$RunReactor.run(MessagingFactory.java:784)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.FutureTask.run(FutureTask.java:266)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;at java.lang.Thread.run(Thread.java:750)&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&lt;SPAN&gt;getting this error after making the changes.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Wed, 09 Aug 2023 16:53:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39462#M26990</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-09T16:53:04Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39495#M26996</link>
      <description>&lt;P&gt;Ok, the error message clearly states a timeout on the event hub side.&lt;BR /&gt;So the best way to approach this is to look at event hub config (instead of going through all kinds of tricks in spark).&lt;BR /&gt;Try to set the receiverTimeout/operationTimeout in the connection string, or the maxRatePerPartition.&lt;BR /&gt;For the first write, the whole delta table will be read so event hub should be ready for what is in fact a large batch load.&lt;BR /&gt;I don't have a clear answer on what setting will work in your situation, so you will have to try out some values (perhaps even combined with spark streaming settings as mentioned before).&lt;/P&gt;&lt;P&gt;&lt;A href="https://github.com/Azure/azure-event-hubs-spark/blob/master/docs/spark-streaming-eventhubs-integration.md" target="_self"&gt;event hub config for spark streaming&lt;/A&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 10 Aug 2023 06:54:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39495#M26996</guid>
      <dc:creator>-werners-</dc:creator>
      <dc:date>2023-08-10T06:54:21Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39517#M27002</link>
      <description>&lt;P&gt;Thank you&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/14792"&gt;@-werners-&lt;/a&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 10 Aug 2023 14:07:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39517#M27002</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-10T14:07:06Z</dc:date>
    </item>
    <item>
      <title>Re: Streaming data from delta table to eventhub after merging data - getting timeout error!!</title>
      <link>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39518#M27003</link>
      <description>&lt;P&gt;Thanks Kaniz!&lt;/P&gt;</description>
      <pubDate>Thu, 10 Aug 2023 14:08:22 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/streaming-data-from-delta-table-to-eventhub-after-merging-data/m-p/39518#M27003</guid>
      <dc:creator>Ruby8376</dc:creator>
      <dc:date>2023-08-10T14:08:22Z</dc:date>
    </item>
  </channel>
</rss>

