cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

spark streaming listener is lagging

Soma
Valued Contributor

We use pyspark streaming listener and it is lagging for 10 hrs

The data streamed in 10 am IST is logged at 10 PM Ist

Can someone explain how logging listener interface work

10 REPLIES 10

willam45
New Contributor II

Hello, 
I have some write this 

The behavior you're describing, where the data streamed at 10 AM IST is being logged at 10 PM IST, indicates a significant lag in your PySpark streaming application. The PySpark StreamingListener interface is not responsible for causing this lag; it is used to monitor and collect information about the progress of your streaming application.

The PySpark StreamingListener interface allows you to create custom listeners that can capture events and metrics related to the execution of your streaming application. This can include events like batch processing times, number of records processed, and other execution statistics. However, the listener itself does not affect the timing or behavior of your streaming application.

If you're facing a significant lag in your streaming application's processing time, you should investigate other aspects of your setup to identify the cause.

Soma
Valued Contributor

hi @willam45  the spark streaming application is working perfectly as I could see from structured streaming UI

but this custom listener which I use to log  to alert real time has some issues .

I just wanted to know how it is handled asynchronously by spark if it is like a fire and forget mechanism

-werners-
Esteemed Contributor III

adding to this:
what logging do you use?

Soma
Valued Contributor

we use Azure log analytics and in log analytics we don't see a lag as

we capture current_timestamp value in (inprogress) and log analytics timestamp_Generated(automatic column)

which get added  when data is written are in difference of 1 min and progress.timestamp which is the stream processing time is showing 10 am while timestamp_generated and current_timestamp in listener code is pointing to current time(which is 10 pm)

-werners-
Esteemed Contributor III

Which one is the correct one?  10PM or 10AM?
When you mention progress.timestamp, do you use onQueryProgress?  Because that is indeed handled asynchronous.  It is called when there is some status update.

Soma
Valued Contributor

I have added below timestamp

microbatch_timestamp --> which is coming from stream progress and it is showing 10 am

batchid -> which is the spark batchid of stream is also corresponding to 10 am

where as ingested_timestamp which is showing around10 pm  which is when it is loaded to log analytics

def onQueryProgress(self, queryProgress):  
        try:
            print("inside on progress")
            # Write the input rate and processing time to ADLS
            if(queryProgress.progress.numInputRows >= 0๐Ÿ˜ž
                progress = queryProgress.progress
                data = {
                    "Stream_id" : progress.id.__str__() ,
                    "Stream_name" : progress.name,
                    "Event_type" : "Query Progress",
                    "runID" : progress.runId.__str__(),
                    "microbatch_timestamp" : progress.timestamp.split('.')[0],
                    "ingested_timestamp" : datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"),
                     "batchId" : progress.batchId,

-werners-
Esteemed Contributor III

I see.
The timestamps from the stream seem to be correct (as they are delivered through the StreamingQueryListener).

Could it be a timezone issue?

Soma
Valued Contributor

No spark runs in UTC and log analytics or python modules also uses UTC.

One more key point is even in dev we had the same code but we had only a latency<2 mins.

Something potentially wrong with pyspark listener

-werners-
Esteemed Contributor III

What is the correct time?  10 AM or 10 PM?

Because 10PM has nothing to do with the listener, it is now()

jerrymark
New Contributor II

When you're experiencing lag in Spark Streaming, it means that the system is not processing data in real-time, and there is a delay in data processing. This delay can be caused by various factors, and diagnosing and addressing the issue requires careful investigation. Here are some common reasons why Spark Streaming might lag and potential solutions:

1. **Resource Constraints**:
- **Insufficient CPU or Memory**: If your Spark cluster doesn't have enough resources (CPU cores, memory) to handle the incoming data rate, it can lead to lag. Consider scaling up your cluster or optimizing your code to be more memory-efficient.

2. **Backpressure**:
- **Data Input Rate > Processing Rate**: If data is ingested into Spark at a higher rate than it can be processed, it can lead to backpressure and lag. Ensure that your processing logic can keep up with the data input rate. You can monitor this using Spark's built-in metrics.

3. **Garbage Collection (GC) Overheads**:
- **Frequent GC**: Frequent garbage collection can cause delays in processing. Monitor the GC activity in your Spark application and adjust memory settings if necessary.

4. **Inefficient Code**:
- **Complex Transformations**: Complex operations or transformations on the data can slow down processing. Optimize your code to be as efficient as possible, and consider using Spark's built-in functions for common operations.

5. **Checkpointing and State Management**:
- **Inefficient Checkpointing**: Checkpointing too frequently or not frequently enough can affect performance. Adjust the checkpointing interval based on your application requirements.
- **Stateful Operations**: If you are using stateful operations (e.g., `updateStateByKey`), make sure you manage state efficiently to avoid excessive memory consumption.

6. **Data Skew**:
- **Uneven Data Distribution**: Uneven data distribution across partitions can lead to some partitions processing more data than others, causing lag. Re-partition your data to achieve a more balanced distribution.

7. **External Dependencies**:
- **Slow Data Sources or Sinks**: If you're reading from or writing to external data sources or sinks (e.g., databases), slow response times can cause lag. Optimize your external dependencies if possible.

8. **Network Issues**:
- **Network Bottlenecks**: Slow network connections between Spark components (e.g., between nodes in a cluster) can cause lag. Ensure that your network infrastructure is robust and doesn't introduce delays.

9. **Application-Level Logging and Debugging**:
- Enable Spark's application-level logging and monitoring to identify bottlenecks and performance issues in your specific application.

10. **Spark Configuration Tuning**:
- Tune Spark configuration settings such as `spark.streaming.backpressure.enabled`, `spark.streaming.receiver.maxRate`, and others based on your use case and cluster resources.

To diagnose and resolve lag in Spark Streaming, it's essential to monitor and analyze the specific metrics and logs of your application. Use Spark's web UI, logs, and monitoring tools to gain insights into the bottlenecks and then apply the appropriate optimizations or adjustments.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.