Databricks Autoloader File Notification Not Working As Expected

Sambit_S
New Contributor III

Hello Everyone,

In my project I am using databricks autoloader to incrementally and efficiently processes new data files as they arrive in cloud storage.

I am using file notification mode with event grid and queue service setup in azure storage account that subscribes to file events from the input directory.

My file size is 65KB and I have received 3million file events.

My code is as below to process the files.

raw_payload_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.queueName", queue_name)
    .option("cloudFiles.connectionString", queue_conn_string)
    .option("cloudFiles.fetchParallelism", 10)
    .option('cloudFiles.format', 'json')
    .option('cloudFiles.schemaLocation', schema_path)
    #.option('cloudFiles.maxFilesPerTrigger', 50000)
    .option('cloudFiles.maxBytesPerTrigger', 10g)
    .option('multiline', 'true')
    .load(pl_path)
    .withColumn("FilePath",input_file_name())
    .withColumn("AppId", lit(app_id))
    .withColumn("SchemaId", lit(schema_id))
    .withColumn("SchemaVersion", lit(schema_version))
    .withColumn("Priority", lit(priority))
)
 
payloadCompressStream = obs_df.writeStream.foreachBatch(forEachBatch) \
                              .trigger(availableNow=True) \
                              .option('checkpointLocation', checkpoint_path) \
                              .start()

Problem

  • When I used both the option cloudFiles.maxFilesPerTrigger set to 50000, I could see it is only triggering 2000 to 5000 files per batch.
  • Same is also true when I used the option cloudFiles.maxBytesPerTrigger set to 10g.
  • One record is one json file here and I monitor the numInputRows attribute for each batch to check how many files are getting processed in a batch.
    { "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40", "runId" : "746a885c-c2d8-4ec8-91c6-6c7126c85e64", "name" : null, "timestamp" : "2024-05-16T09:36:49.295Z", "batchId" : 1319, "batchDuration" : 3349, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "durationMs" : { "addBatch" : 2627, "commitOffsets" : 153, "getBatch" : 42, "latestOffset" : 352, "queryPlanning" : 9, "triggerExecution" : 3349, "walCommit" : 147 }, "stateOperators" : [ ], "sources" : [ { "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]", "startOffset" : { "seqNum" : 7835887, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "endOffset" : { "seqNum" : 7837532, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "latestOffset" : null, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "metrics" : { "approximateQueueSize" : "0", "numBytesOutstanding" : "0", "numFilesOutstanding" : "0" } } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 }, "observedMetrics" : { "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : { "PipelineRunID" : "fabf2d53-29d6-4ef6-ab7d-665f99227c9c", "BatchStatus" : "Fail", "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2", "SchemaId" : 15012, "SchemaVersion" : 1, "Priority" : 10, "InputRecordCount" : 952, "OutputRecordCount" : 952 } } }

Cluster Configuration

Driver: Standard_F32s_v2 · Workers: Standard_F16s_v2 · 30-50 workers · 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)