cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Spark Structured Streaming : Data write is too slow into adls.

UmaMahesh1
Honored Contributor III

I'm a bit new to spark structured streaming stuff so do ask all the relevant questions if I missed any.

I have a notebook which consumes the events from a kafka topic and writes those records into adls. The topic is json serialized so I'm just writing that value column as a json string as it is into adls without flattening. (I would be doing the flattening part later in a separate notebook providing schema).

For the Writer for each batch, I set the trigger to availableNow = true. The maxOffsetsPerTrigger is set to be 5000. While consuming the data, I'm also adding a current_timestamp column to identify when each event was being consumed. I would be running the notebook once a day as I don't want to overbill the resources.

Since availableNow as true processes all the available data since the last checkpoint in micro batches, my expectation was that there would be chunks of close to 5000 records written in adls. But what I'm finding is that a random number of entries ranging from 1 to 1000 are being written at a time interval of 5-15 mins which I'm able to identify using the current_timestamp I added while reading the topic.

E.g. 20-11-2022 10:30:00:000 - 10 records

20-11-2022 10:35:00:000 - 1 record

20-11-2022 10:45:00:000 - 250 records .....

Because of this weird processing, if in 1 day the topic producer produced around 2000 events, it takes around 45 mins to 1 hour to consume and load the data into adls.

Can anyone explain why this is happening. Running a pipeline job for 1 hour to load 1000 records definitely seems to be an overkill.

P.S. this has nothing to do with cluster as the cluster itself is a very high performance one in production environment.

8 REPLIES 8

Hubert-Dudek
Esteemed Contributor III

I would

  • remove maxOffsetsPerTrigger,
  • analyze spark UI
  • use the display() to see how the stream is read (without writing it to delta) or alternatively write to noop as problem can be with read or write or with kafka itself
  • set for stream shuffle partitions to the number of cores

spark.conf.set("spark.sql.shuffle.partitions", sc.DefaultParallelism)

in stream add .option("minPartitions", sc.DefaultParallelism)

UmaMahesh1
Honored Contributor III

Any particular reason why I should be removing maxOffsetsPerTrigger ?

Also, when I consume the same confluent kafka topic in a diff environment (non-prod), using the same notebook, it takes around 1 min only. 😅 There are no event transmission issues too both in non-prod and prod.

Maybe I will get back once I check out the 3rd and 4th options..

Thanks.

Hubert-Dudek
Esteemed Contributor III

just for tests

-werners-
Esteemed Contributor III

my guess is this because of how the topic is partitioned. Check the docs about availableNow:

Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.

https://spark.apache.org/docs/3.1.3/structured-streaming-kafka-integration.html

jose_gonzalez
Moderator
Moderator

Where is the job taking time? would you mind sharing the JSON query metrics? Go to the Driver's logs. This will help us to narrow down the bottle neck

Miletto
New Contributor II

adsada.JPG

Miletto
New Contributor II
The triggers availableNow or Once dont respect maxFilesPerTrigger or maxOffsetsPerTrigger.
The only way to use this option is on stream mode.
The trigger avaiable now internaly decides the number of files/rows readed to not exced the avaible memory.

Kaniz
Community Manager
Community Manager

Hi @UmaMahesh1 , 

• Spark Structured Streaming interacts with Kafka in a certain way, leading to the observed behaviour.
• The parameter maxOffsetsPerTrigger in Spark Structured Streaming determines the maximum rate of data read from Kafka.
• However, it doesn't guarantee processing precisely that number of records in each trigger.
• Factors like data availability, processing time, and Kafka consumer fetch size can result in fewer records being processed in a trigger.
• Variability in processing time or trigger interval can cause longer intervals between written records.
• In the case of running the notebook once a day, if the Kafka topic has fewer than 5,000 new records, fewer records will be processed.

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.