cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingest Data into Databricks with Kafka

Pbarbosa154
New Contributor III

I am trying to ingest data into Databricks with Kafka. I have Kafka installed in a Virtual Machine where I already have the data I need in a Kafka Topic stored as json. In Databricks, I have the following code:

```

df = (spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<VM_IP:9092>")
  .option("subscribe", "<topicName>")
  .load()
)

```

Where the printed schema gives me:

```

 |-- key: binary (nullable = true) |-- value: binary (nullable = true) |-- topic: string (nullable = true) |-- partition: integer (nullable = true) |-- offset: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- timestampType: integer (nullable = true)

```

Then I try to write the data to a delta table but the code for that only outputs 'Stream Initializing' and gets stuck there.

I would like to get some help because I cannot figure out what I am doing wrong or missing on this.

10 REPLIES 10

Kaniz
Community Manager
Community Manager

Hi @Pbarbosa154Based on the information provided, the issue you're facing might be due to not setting a checkpoint location while writing to the delta table. In Structured Streaming, a checkpoint location is used to store metadata about the streaming data so that it can recover from failures. Without the checkpoint location, it might cause the stream to hang at the initialization stage.

Pbarbosa154
New Contributor III

Hi @Kaniz, thanks for the answer. But I have a checkpoint location when writing. This is the code:

delta_table_path = "/mnt/delta-table-path"

df.writeStream \
 .format("delta") \ 
 .outputMode("append") \
 .option("checkpointLocation", "/mnt/checkpoint-location") 
 .start(delta_table_path)

 

Kaniz
Community Manager
Community Manager

Hi @Pbarbosa154you are using DBFS for checkpoint storage in your code. The issue you are facing could be due to the DBFS filling up, which is causing your streaming job to get stuck. As per the information from the provided sources, Databricks recommends using persistent storage for streaming checkpoints instead of DBFS. You should update your code to use a constant cloud storage path for the checkpointLocation option.

Pbarbosa154
New Contributor III

What about using hivestore in Databricks? And maybe that's an issue but I tried to make this pipeline ir order to process only one message and still got stuck in the stream initializing

Tharun-Kumar
Honored Contributor II
Honored Contributor II

@Pbarbosa154 

Could we try display(df) after the readStream to see whether we are able to read data from Kafka. This will help us to eliminate the possibility of Kafka read issues.

I also get stuck with this...

Screenshot 2023-09-07 at 14.37.39.png

Could it be a problem of cluster memory? Or network issues related to the connection with the Virtual Machine? 

jose_gonzalez
Moderator
Moderator

you need to check the driver's logs when your streaming is initializing. Please check the log4j output for the driver's logs. If there is an issue connecting to your Kafka broker, you will be able to see it 

Yeah, in fact when checking the log4j logs i have the following:

23/09/11 09:11:27 WARN NetworkClient: [Consumer clientId=consumer-spark-kafka-source-3e9266a6-081d-4946-b41e-38873d2b01c0--1036396469-driver-0-1, groupId=spark-kafka-source-3e9266a6-081d-4946-b41e-38873d2b01c0--1036396469-driver-0] Bootstrap broker VM_IP (id: -1 rack: null) disconnected  

I added 'listeners = PLAINTEXT://VM_IP:9092' to kafka config (solution i saw when searched for the issue) but I am still having issues when trying to connect to the VM

 

Update: After changing the IP address to the external IP of the machine i get:

23/09/11 10:14:47 INFO AppInfoParser: Kafka version: 7.4.0-ccs
23/09/11 10:14:47 INFO AppInfoParser: Kafka commitId: 30969fa33c185e88
23/09/11 10:14:47 INFO AppInfoParser: Kafka startTimeMs: 1694427287346
23/09/11 10:14:47 INFO KafkaConsumer: [Consumer clientId=consumer-spark-kafka-source-51917966-dd8d-4b6b-9532-6076a916ea5b-998856815-driver-0-1, groupId=spark-kafka-source-51917966-dd8d-4b6b-9532-6076a916ea5b-998856815-driver-0] Subscribed to topic(s): <topicName>

But soon after it closes the connection again...

Kaniz
Community Manager
Community Manager

Hi @Pbarbosa154, We can build a thriving shared knowledge and insights community. Come back and accept the solution to contribute to our ongoing pursuit of excellence.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.