11-29-2022 10:38 AM
I'm a bit new to spark structured streaming stuff so do ask all the relevant questions if I missed any.
I have a notebook which consumes the events from a kafka topic and writes those records into adls. The topic is json serialized so I'm just writing that value column as a json string as it is into adls without flattening. (I would be doing the flattening part later in a separate notebook providing schema).
For the Writer for each batch, I set the trigger to availableNow = true. The maxOffsetsPerTrigger is set to be 5000. While consuming the data, I'm also adding a current_timestamp column to identify when each event was being consumed. I would be running the notebook once a day as I don't want to overbill the resources.
Since availableNow as true processes all the available data since the last checkpoint in micro batches, my expectation was that there would be chunks of close to 5000 records written in adls. But what I'm finding is that a random number of entries ranging from 1 to 1000 are being written at a time interval of 5-15 mins which I'm able to identify using the current_timestamp I added while reading the topic.
E.g. 20-11-2022 10:30:00:000 - 10 records
20-11-2022 10:35:00:000 - 1 record
20-11-2022 10:45:00:000 - 250 records .....
Because of this weird processing, if in 1 day the topic producer produced around 2000 events, it takes around 45 mins to 1 hour to consume and load the data into adls.
Can anyone explain why this is happening. Running a pipeline job for 1 hour to load 1000 records definitely seems to be an overkill.
P.S. this has nothing to do with cluster as the cluster itself is a very high performance one in production environment.
11-29-2022 12:18 PM
I would
spark.conf.set("spark.sql.shuffle.partitions", sc.DefaultParallelism)
in stream add .option("minPartitions", sc.DefaultParallelism)
11-29-2022 12:35 PM
Any particular reason why I should be removing maxOffsetsPerTrigger ?
Also, when I consume the same confluent kafka topic in a diff environment (non-prod), using the same notebook, it takes around 1 min only. 😅 There are no event transmission issues too both in non-prod and prod.
Maybe I will get back once I check out the 3rd and 4th options..
Thanks.
11-30-2022 04:52 AM
just for tests
11-30-2022 12:55 AM
my guess is this because of how the topic is partitioned. Check the docs about availableNow:
Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume.
https://spark.apache.org/docs/3.1.3/structured-streaming-kafka-integration.html
01-30-2023 04:40 PM
Where is the job taking time? would you mind sharing the JSON query metrics? Go to the Driver's logs. This will help us to narrow down the bottle neck
08-28-2023 02:09 PM
08-28-2023 02:10 PM
The triggers availableNow or Once dont respect maxFilesPerTrigger or maxOffsetsPerTrigger.
The only way to use this option is on stream mode.
The trigger avaiable now internaly decides the number of files/rows readed to not exced the avaible memory.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group