cancel
Showing results for 
Search instead for 
Did you mean: 

Spark is not reading Kinesis Data as fast as specified

938452
New Contributor III

Hi Databricks community team,

I have code as below

"""

df = spark.readStream \
.format("kinesis") \
.option("endpointUrl", endpoint_url) \
.option("streamName", stream_name) \
.option("initialPosition", "latest") \
.option("consumerMode", "efo") \
.option("maxFetchDuration", "500ms") \
.load()
"""
 
With maxFetchDuration, I thought it would fetch data pretty fast. But it felt like it was still doing batch read of multiple seconds. So I added a timestamp to track when it starts to get processed, as well as to trackapproximateArrivalTimestamp from Kinesis:
"""
df = df \
.selectExpr("approximateArrivalTimestamp", "cast (data as STRING) data") \
.withColumn("processed_timestamp", F.current_timestamp()) \
.select(F.col("approximateArrivalTimestamp"), F.col("processed_timestamp"), F.from_json("data", SOME_SCHEMA).alias("data_fields")) \
.select('approximateArrivalTimestamp', "processed_timestamp", 'data_fields.*')
"""
 
I do satisfy # cores in cluster >= 2 * (# Kinesis shards) / shardsPerTask -> (8 cores * 4 worker) >= 2 * 64 / 5 -> 32 >= 25.6. I'm using latest Databricks Runtime 14.0 (Spark 3.5.0). This is the only Kinesis consumer to ensure there is no another consumer competing for resource and also got EFO on.
 
There is roughly 30 seconds gap between approximateArrivalTimestamp and processed_timestamp consistently. What can I do to lower the gap ?
 
Attaching evidence of Spark processing in same chunk despite the data arriving to Kinesis few seconds apart.
1 ACCEPTED SOLUTION

Accepted Solutions

Kaniz
Community Manager
Community Manager

Hi @938452I can suggest a few things that might help you:

1. **Check your network latency:** The latency between your Spark cluster and the Kinesis stream can add to the delay. Ensure your Spark cluster and Kinesis stream are in the same region to minimize network latency.

2. **Adjust the batch interval:** The batch interval of your Spark Streaming job can affect the processing time. If your batch interval is too large, it might cause delays. Try reducing the batch interval to process the data more frequently 

3. **Tune your Spark job:** You can tune your Spark job to process the data faster. This includes adjusting the number of cores, the amount of memory allocated to each executor, and the number of executors.

4. **Check your data processing code:** The code you use to process the data can also affect the processing time. Ensure your code is optimized and contains no operations that can slow down the processing.

5. **Use Kinesis Client Library (KCL):** KCL provides a high-level API for processing data from Kinesis. It also handles complex tasks associated with distributed computing, such as load balancing across multiple instances, responding to instance failures, and coordinating and checkpointing record processing. 

Unfortunately, it's hard to provide a more precise answer without more specific information about your setup and your data.

I recommend contacting Databricks support by filing a support ticket for more tailored assistance.

View solution in original post

1 REPLY 1

Kaniz
Community Manager
Community Manager

Hi @938452I can suggest a few things that might help you:

1. **Check your network latency:** The latency between your Spark cluster and the Kinesis stream can add to the delay. Ensure your Spark cluster and Kinesis stream are in the same region to minimize network latency.

2. **Adjust the batch interval:** The batch interval of your Spark Streaming job can affect the processing time. If your batch interval is too large, it might cause delays. Try reducing the batch interval to process the data more frequently 

3. **Tune your Spark job:** You can tune your Spark job to process the data faster. This includes adjusting the number of cores, the amount of memory allocated to each executor, and the number of executors.

4. **Check your data processing code:** The code you use to process the data can also affect the processing time. Ensure your code is optimized and contains no operations that can slow down the processing.

5. **Use Kinesis Client Library (KCL):** KCL provides a high-level API for processing data from Kinesis. It also handles complex tasks associated with distributed computing, such as load balancing across multiple instances, responding to instance failures, and coordinating and checkpointing record processing. 

Unfortunately, it's hard to provide a more precise answer without more specific information about your setup and your data.

I recommend contacting Databricks support by filing a support ticket for more tailored assistance.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.