cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader File Notification Not Working As Expected

Sambit_S
New Contributor III

Hello Everyone,

In my project I am using databricks autoloader to incrementally and efficiently processes new data files as they arrive in cloud storage.

I am using file notification mode with event grid and queue service setup in azure storage account that subscribes to file events from the input directory.

My file size is 65KB and I have received 3million file events.

My code is as below to process the files.

raw_payload_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.queueName", queue_name)
    .option("cloudFiles.connectionString", queue_conn_string)
    .option("cloudFiles.fetchParallelism", 10)
    .option('cloudFiles.format', 'json')
    .option('cloudFiles.schemaLocation', schema_path)
    #.option('cloudFiles.maxFilesPerTrigger', 50000)
    .option('cloudFiles.maxBytesPerTrigger', 10g)
    .option('multiline', 'true')
    .load(pl_path)
    .withColumn("FilePath",input_file_name())
    .withColumn("AppId", lit(app_id))
    .withColumn("SchemaId", lit(schema_id))
    .withColumn("SchemaVersion", lit(schema_version))
    .withColumn("Priority", lit(priority))
)
 
payloadCompressStream = obs_df.writeStream.foreachBatch(forEachBatch) \
                              .trigger(availableNow=True) \
                              .option('checkpointLocation', checkpoint_path) \
                              .start()

Problem

  • When I used both the option cloudFiles.maxFilesPerTrigger set to 50000, I could see it is only triggering 2000 to 5000 files per batch.
  • Same is also true when I used the option cloudFiles.maxBytesPerTrigger set to 10g.
  • One record is one json file here and I monitor the numInputRows attribute for each batch to check how many files are getting processed in a batch.
    { "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40", "runId" : "746a885c-c2d8-4ec8-91c6-6c7126c85e64", "name" : null, "timestamp" : "2024-05-16T09:36:49.295Z", "batchId" : 1319, "batchDuration" : 3349, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "durationMs" : { "addBatch" : 2627, "commitOffsets" : 153, "getBatch" : 42, "latestOffset" : 352, "queryPlanning" : 9, "triggerExecution" : 3349, "walCommit" : 147 }, "stateOperators" : [ ], "sources" : [ { "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]", "startOffset" : { "seqNum" : 7835887, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "endOffset" : { "seqNum" : 7837532, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "latestOffset" : null, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "metrics" : { "approximateQueueSize" : "0", "numBytesOutstanding" : "0", "numFilesOutstanding" : "0" } } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 }, "observedMetrics" : { "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : { "PipelineRunID" : "fabf2d53-29d6-4ef6-ab7d-665f99227c9c", "BatchStatus" : "Fail", "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2", "SchemaId" : 15012, "SchemaVersion" : 1, "Priority" : 10, "InputRecordCount" : 952, "OutputRecordCount" : 952 } } }

Cluster Configuration

Driver: Standard_F32s_v2 · Workers: Standard_F16s_v2 · 30-50 workers · 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)

9 REPLIES 9

Kaniz
Community Manager
Community Manager

Hi @Sambit_S

cloudFiles.maxFilesPerTrigger:

  • This option specifies the maximum number of files processed in each micro-batch.
  • By default, it’s set to 10001.
  • When you set it to 50000, you expect it to trigger more files per batch, but you’re observing only 2000 to 5000 files per batch.
  • This is an upper-bound limit, and other factors (such as file sizes, backfill intervals, and event notification delivery) can affect the actual number of files processed. Consider adjusting this value based on your specific use case.

cloudFiles.maxBytesPerTrigger:

Let me know if you need further assistance.

 

Sambit_S
New Contributor III

Hi Kaniz,

Thank you for your response.

I have read through all your points from the official documentation page.

Can you let me know how can I achieve below?

  • I have 3 million json files each of 65KB, so total size is 186GB. It is in adls gen2 storage container and the queue contains all the event notifications
  • Sambit_S_2-1716288865124.png 
  • I want to process 10GB data files per micro batch of autoloader streaming with file event notification set to true. SO total batch would come around 20.
  • What configuration cluster I have to use and what other config I have to set other than cloudFiles.maxBytesPerTrigger to 10g.
  • The readstream code is as below.
    •  Sambit_S_1-1716288754052.png

matthew_m
New Contributor III
New Contributor III

Hi @Sambit_S,

File notification would only impact any new arriving files, so unless you have a backfill interval to load any existing data it will not load existing data. You could also backfill initially by using directory mode and then switch to file notification for new arriving data. This can be done with the same checkpoint.

From the logs you shared it shows that there are no outstanding bytes or files, so this could indicate that the existing data is not being loaded.

Let me know if it helps.

Sambit_S
New Contributor III

Hi  matthew_m,

Please check my comments below and let me know if you find something

Sambit_S
New Contributor III

File notification would only impact any new arriving files

Yes, I have all the 3 million files as newly arriving files as I generate synthetic data files for performance testing.

From the logs you shared it shows that there are no outstanding bytes or files.

I think it was the last batch of the stream but if you can see below metric

where "numBytesOutstanding" : "2605258897", "numFilesOutstanding" : "60310" are wrong as per my assumption, because when it runs next batch the numbers increase/decrease. 

{
  "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40",
  "runId" : "6d3ac39b-42c1-4bd8-99c7-8477817b278e",
  "name" : null,
  "timestamp" : "2024-05-21T14:16:10.155Z",
  "batchId" : 1478,
  "batchDuration" : 35581,
  "numInputRows" : 998,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 28.04867766504595,
  "durationMs" : {
    "addBatch" : 25634,
    "commitOffsets" : 202,
    "getBatch" : 246,
    "latestOffset" : 2295,
    "queryPlanning" : 5687,
    "triggerExecution" : 35568,
    "walCommit" : 180
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]",
    "startOffset" : {
      "seqNum" : 18875879,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1715782085477,
      "lastBackfillFinishTimeMs" : 1715782478832,
      "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/"
    },
    "endOffset" : {
      "seqNum" : 18877451,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1715782085477,
      "lastBackfillFinishTimeMs" : 1715782478832,
      "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/"
    },
    "latestOffset" : null,
    "numInputRows" : 998,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 28.04867766504595,
    "metrics" : {
      "approximateQueueSize" : "5869492",
      "numBytesOutstanding" : "2605258897",
      "numFilesOutstanding" : "60310"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  },
  "observedMetrics" : {
    "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : {
      "PipelineRunID" : "5ba763c1-4e6f-4b8e-8555-9f42935fd6a3",
      "BatchStatus" : "Fail",
      "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2",
      "SchemaId" : 15012,
      "SchemaVersion" : 1,
      "Priority" : 10,
      "InputRecordCount" : 998,
      "OutputRecordCount" : 998
    }
  }
}

 

matthew_m
New Contributor III
New Contributor III

Hi @Sambit_S ,

From that last log I can see that it ingested 998 files, which is close to the expected default max, which appears to be set in your code. maxBytesPerTrigger and maxFilesPerTrigger work together when defining the soft upper bound max.

 

Sambit_S
New Contributor III

Hi  matthew_m

I have only set maxBytesPerTrigger to 10g and if you see for the next batch metric it is something like below.

Sambit_S_0-1716303042890.png

Sambit_S
New Contributor III

Found some documentation on Azure Queue throughput which might be the reason.

Sambit_S_0-1716372928961.png

Any idea how can we maximize the throughput? Is there some spark autoloader configuration which can help increasing the streaming batch size?

matthew_m
New Contributor III
New Contributor III

Hi @Sambit_S ,

I misread inputRows as inputFiles which aren't the same thing. Considering the limitation on Azure queue, if you are already at the limit then you may need to consider to switching to an event source such as Kafka or Event Hub to get better ingestion performance.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!