<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>topic Re: Ingest Data into Databricks with Kafka in Data Engineering</title>
    <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44005#M27589</link>
    <description>&lt;P&gt;I also get stuck with this...&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="Screenshot 2023-09-07 at 14.37.39.png" style="width: 200px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3596i32446190F96BEA05/image-size/small/is-moderation-mode/true?v=v2&amp;amp;px=200" role="button" title="Screenshot 2023-09-07 at 14.37.39.png" alt="Screenshot 2023-09-07 at 14.37.39.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Could it be a problem of cluster memory? Or network issues related to the connection with the Virtual Machine?&amp;nbsp;&lt;/P&gt;</description>
    <pubDate>Thu, 07 Sep 2023 13:39:29 GMT</pubDate>
    <dc:creator>Pbarbosa154</dc:creator>
    <dc:date>2023-09-07T13:39:29Z</dc:date>
    <item>
      <title>Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43479#M27508</link>
      <description>&lt;P&gt;I am trying to ingest data into Databricks with Kafka. I have Kafka installed in a Virtual Machine where I already have the data I need in a Kafka Topic stored as json. In Databricks, I have the following code:&lt;/P&gt;&lt;P&gt;```&lt;/P&gt;&lt;PRE&gt;&lt;SPAN class=""&gt;df&lt;/SPAN&gt; &lt;SPAN class=""&gt;=&lt;/SPAN&gt; &lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;spark&lt;/SPAN&gt;&lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;readStream&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;format&lt;/SPAN&gt;&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"kafka"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;option&lt;/SPAN&gt;&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"kafka.bootstrap.servers"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"&amp;lt;VM_IP:9092&amp;gt;"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;option&lt;/SPAN&gt;&lt;SPAN class=""&gt;(&lt;/SPAN&gt;&lt;SPAN class=""&gt;"subscribe"&lt;/SPAN&gt;&lt;SPAN class=""&gt;,&lt;/SPAN&gt; &lt;SPAN class=""&gt;"&amp;lt;topicName&amp;gt;"&lt;/SPAN&gt;&lt;SPAN class=""&gt;)&lt;/SPAN&gt;
  &lt;SPAN class=""&gt;.&lt;/SPAN&gt;&lt;SPAN class=""&gt;load&lt;/SPAN&gt;&lt;SPAN class=""&gt;()&lt;/SPAN&gt;
&lt;SPAN class=""&gt;)&lt;BR /&gt;&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;```&lt;/P&gt;&lt;P&gt;Where the printed schema gives me:&lt;/P&gt;&lt;P&gt;```&lt;/P&gt;&lt;PRE&gt;&lt;SPAN&gt; |-- key: &lt;/SPAN&gt;&lt;SPAN&gt;binary&lt;/SPAN&gt;&lt;SPAN&gt; (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- value: &lt;/SPAN&gt;&lt;SPAN&gt;binary&lt;/SPAN&gt;&lt;SPAN&gt; (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- topic: &lt;/SPAN&gt;&lt;SPAN&gt;string&lt;/SPAN&gt;&lt;SPAN&gt; (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- partition: &lt;/SPAN&gt;&lt;SPAN&gt;integer&lt;/SPAN&gt;&lt;SPAN&gt; (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- offset: long (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- timestamp: timestamp (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;SPAN&gt; |-- timestampType: &lt;/SPAN&gt;&lt;SPAN&gt;integer&lt;/SPAN&gt;&lt;SPAN&gt; (nullable = &lt;/SPAN&gt;&lt;SPAN&gt;true&lt;/SPAN&gt;&lt;SPAN&gt;)&lt;/SPAN&gt;&lt;/PRE&gt;&lt;P&gt;```&lt;/P&gt;&lt;P&gt;Then I try to write the data to a delta table but the code for that only outputs 'Stream Initializing' and gets stuck there.&lt;/P&gt;&lt;P&gt;I would like to get some help because I cannot figure out what I am doing wrong or missing on this.&lt;/P&gt;</description>
      <pubDate>Mon, 04 Sep 2023 14:45:54 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43479#M27508</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-04T14:45:54Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43491#M27512</link>
      <description>&lt;P&gt;&lt;SPAN&gt;Hi&amp;nbsp;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/9"&gt;@Retired_mod&lt;/a&gt;, thanks for the answer. But I have a checkpoint location when writing. This is the code:&lt;/SPAN&gt;&lt;/P&gt;&lt;LI-CODE lang="python"&gt;delta_table_path = "/mnt/delta-table-path"

df.writeStream \
 .format("delta") \ 
 .outputMode("append") \
 .option("checkpointLocation", "/mnt/checkpoint-location") 
 .start(delta_table_path)&lt;/LI-CODE&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 04 Sep 2023 15:20:25 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43491#M27512</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-04T15:20:25Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43503#M27514</link>
      <description>&lt;P&gt;What about using hivestore in Databricks? And maybe that's an issue but I tried to make this pipeline ir order to process only one message and still got stuck in the stream initializing&lt;/P&gt;</description>
      <pubDate>Mon, 04 Sep 2023 15:34:15 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43503#M27514</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-04T15:34:15Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43902#M27571</link>
      <description>&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/55220"&gt;@Pbarbosa154&lt;/a&gt;&amp;nbsp;&lt;/P&gt;&lt;P&gt;Could we try &lt;EM&gt;&lt;STRONG&gt;display(df)&lt;/STRONG&gt;&lt;/EM&gt; after the readStream to see whether we are able to read data from Kafka. This will help us to eliminate the possibility of Kafka read issues.&lt;/P&gt;</description>
      <pubDate>Thu, 07 Sep 2023 05:16:16 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/43902#M27571</guid>
      <dc:creator>Tharun-Kumar</dc:creator>
      <dc:date>2023-09-07T05:16:16Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44005#M27589</link>
      <description>&lt;P&gt;I also get stuck with this...&lt;/P&gt;&lt;P&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="Screenshot 2023-09-07 at 14.37.39.png" style="width: 200px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/3596i32446190F96BEA05/image-size/small/is-moderation-mode/true?v=v2&amp;amp;px=200" role="button" title="Screenshot 2023-09-07 at 14.37.39.png" alt="Screenshot 2023-09-07 at 14.37.39.png" /&gt;&lt;/span&gt;&lt;/P&gt;&lt;P&gt;Could it be a problem of cluster memory? Or network issues related to the connection with the Virtual Machine?&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Thu, 07 Sep 2023 13:39:29 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44005#M27589</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-07T13:39:29Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44140#M27607</link>
      <description>&lt;P&gt;you need to check the driver's logs when your streaming is initializing. Please check the log4j output for the driver's logs. If there is an issue connecting to your Kafka broker, you will be able to see it&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Fri, 08 Sep 2023 22:41:46 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44140#M27607</guid>
      <dc:creator>jose_gonzalez</dc:creator>
      <dc:date>2023-09-08T22:41:46Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44310#M27630</link>
      <description>&lt;P&gt;Yeah, in fact when checking the log4j logs i have the following:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;23/09/11 09:11:27 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3e9266a6-081d-4946-b41e-38873d2b01c0--1036396469-driver-0-1, groupId=spark-kafka-source-3e9266a6-081d-4946-b41e-38873d2b01c0--1036396469-driver-0] Bootstrap broker VM_IP (id: -1 rack: null) disconnected&amp;nbsp;&amp;nbsp;&lt;/PRE&gt;&lt;P&gt;I added '&lt;SPAN&gt;listeners = PLAINTEXT://VM_IP:9092' to kafka config (solution i saw when searched for the issue) but I am still having issues when trying to connect to the VM&lt;/SPAN&gt;&lt;/P&gt;&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Sep 2023 09:21:21 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44310#M27630</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-11T09:21:21Z</dc:date>
    </item>
    <item>
      <title>Re: Ingest Data into Databricks with Kafka</title>
      <link>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44313#M27631</link>
      <description>&lt;P&gt;Update: After changing the IP address to the external IP of the machine i get:&lt;BR /&gt;&lt;BR /&gt;&lt;/P&gt;&lt;PRE&gt;23/09/11 10:14:47 INFO AppInfoParser: Kafka version: 7.4.0-ccs
23/09/11 10:14:47 INFO AppInfoParser: Kafka commitId: 30969fa33c185e88
23/09/11 10:14:47 INFO AppInfoParser: Kafka startTimeMs: 1694427287346
23/09/11 10:14:47 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-51917966-dd8d-4b6b-9532-6076a916ea5b-998856815-driver-0-1, groupId=spark-kafka-source-51917966-dd8d-4b6b-9532-6076a916ea5b-998856815-driver-0] Subscribed to topic(s): &amp;lt;topicName&amp;gt;&lt;/PRE&gt;&lt;P&gt;But soon after it closes the connection again...&lt;/P&gt;</description>
      <pubDate>Mon, 11 Sep 2023 10:19:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/data-engineering/ingest-data-into-databricks-with-kafka/m-p/44313#M27631</guid>
      <dc:creator>Pbarbosa154</dc:creator>
      <dc:date>2023-09-11T10:19:51Z</dc:date>
    </item>
  </channel>
</rss>

