05-20-2024 03:57 AM
Hello Everyone,
In my project I am using databricks autoloader to incrementally and efficiently processes new data files as they arrive in cloud storage.
I am using file notification mode with event grid and queue service setup in azure storage account that subscribes to file events from the input directory.
My file size is 65KB and I have received 3million file events.
My code is as below to process the files.
Problem
One record is one json file here and I monitor the numInputRows attribute for each batch to check how many files are getting processed in a batch.
{ "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40", "runId" : "746a885c-c2d8-4ec8-91c6-6c7126c85e64", "name" : null, "timestamp" : "2024-05-16T09:36:49.295Z", "batchId" : 1319, "batchDuration" : 3349, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "durationMs" : { "addBatch" : 2627, "commitOffsets" : 153, "getBatch" : 42, "latestOffset" : 352, "queryPlanning" : 9, "triggerExecution" : 3349, "walCommit" : 147 }, "stateOperators" : [ ], "sources" : [ { "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]", "startOffset" : { "seqNum" : 7835887, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "endOffset" : { "seqNum" : 7837532, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "latestOffset" : null, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "metrics" : { "approximateQueueSize" : "0", "numBytesOutstanding" : "0", "numFilesOutstanding" : "0" } } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 }, "observedMetrics" : { "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : { "PipelineRunID" : "fabf2d53-29d6-4ef6-ab7d-665f99227c9c", "BatchStatus" : "Fail", "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2", "SchemaId" : 15012, "SchemaVersion" : 1, "Priority" : 10, "InputRecordCount" : 952, "OutputRecordCount" : 952 } } }
Cluster Configuration
Driver: Standard_F32s_v2 · Workers: Standard_F16s_v2 · 30-50 workers · 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)
05-21-2024 03:54 AM
Hi Kaniz,
Thank you for your response.
I have read through all your points from the official documentation page.
Can you let me know how can I achieve below?
05-21-2024 05:58 AM
Hi @Sambit_S,
File notification would only impact any new arriving files, so unless you have a backfill interval to load any existing data it will not load existing data. You could also backfill initially by using directory mode and then switch to file notification for new arriving data. This can be done with the same checkpoint.
From the logs you shared it shows that there are no outstanding bytes or files, so this could indicate that the existing data is not being loaded.
Let me know if it helps.
05-21-2024 07:34 AM
Hi matthew_m,
Please check my comments below and let me know if you find something
05-21-2024 07:20 AM
File notification would only impact any new arriving files
Yes, I have all the 3 million files as newly arriving files as I generate synthetic data files for performance testing.
From the logs you shared it shows that there are no outstanding bytes or files.
I think it was the last batch of the stream but if you can see below metric
where "numBytesOutstanding" : "2605258897", "numFilesOutstanding" : "60310" are wrong as per my assumption, because when it runs next batch the numbers increase/decrease.
{ "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40", "runId" : "6d3ac39b-42c1-4bd8-99c7-8477817b278e", "name" : null, "timestamp" : "2024-05-21T14:16:10.155Z", "batchId" : 1478, "batchDuration" : 35581, "numInputRows" : 998, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 28.04867766504595, "durationMs" : { "addBatch" : 25634, "commitOffsets" : 202, "getBatch" : 246, "latestOffset" : 2295, "queryPlanning" : 5687, "triggerExecution" : 35568, "walCommit" : 180 }, "stateOperators" : [ ], "sources" : [ { "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]", "startOffset" : { "seqNum" : 18875879, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "endOffset" : { "seqNum" : 18877451, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "latestOffset" : null, "numInputRows" : 998, "inputRowsPerSecond" : 0.0, "processedRowsPerSecond" : 28.04867766504595, "metrics" : { "approximateQueueSize" : "5869492", "numBytesOutstanding" : "2605258897", "numFilesOutstanding" : "60310" } } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 }, "observedMetrics" : { "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : { "PipelineRunID" : "5ba763c1-4e6f-4b8e-8555-9f42935fd6a3", "BatchStatus" : "Fail", "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2", "SchemaId" : 15012, "SchemaVersion" : 1, "Priority" : 10, "InputRecordCount" : 998, "OutputRecordCount" : 998 } } }
05-21-2024 07:42 AM
Hi @Sambit_S ,
From that last log I can see that it ingested 998 files, which is close to the expected default max, which appears to be set in your code. maxBytesPerTrigger and maxFilesPerTrigger work together when defining the soft upper bound max.
05-21-2024 07:51 AM
Hi matthew_m
I have only set maxBytesPerTrigger to 10g and if you see for the next batch metric it is something like below.
05-22-2024 03:17 AM
Found some documentation on Azure Queue throughput which might be the reason.
Any idea how can we maximize the throughput? Is there some spark autoloader configuration which can help increasing the streaming batch size?
05-22-2024 05:23 AM
Hi @Sambit_S ,
I misread inputRows as inputFiles which aren't the same thing. Considering the limitation on Azure queue, if you are already at the limit then you may need to consider to switching to an event source such as Kafka or Event Hub to get better ingestion performance.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group