cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks Autoloader File Notification Not Working As Expected

Sambit_S
New Contributor III

Hello Everyone,

In my project I am using databricks autoloader to incrementally and efficiently processes new data files as they arrive in cloud storage.

I am using file notification mode with event grid and queue service setup in azure storage account that subscribes to file events from the input directory.

My file size is 65KB and I have received 3million file events.

My code is as below to process the files.

raw_payload_df = (
    spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.queueName", queue_name)
    .option("cloudFiles.connectionString", queue_conn_string)
    .option("cloudFiles.fetchParallelism", 10)
    .option('cloudFiles.format', 'json')
    .option('cloudFiles.schemaLocation', schema_path)
    #.option('cloudFiles.maxFilesPerTrigger', 50000)
    .option('cloudFiles.maxBytesPerTrigger', 10g)
    .option('multiline', 'true')
    .load(pl_path)
    .withColumn("FilePath",input_file_name())
    .withColumn("AppId", lit(app_id))
    .withColumn("SchemaId", lit(schema_id))
    .withColumn("SchemaVersion", lit(schema_version))
    .withColumn("Priority", lit(priority))
)
 
payloadCompressStream = obs_df.writeStream.foreachBatch(forEachBatch) \
                              .trigger(availableNow=True) \
                              .option('checkpointLocation', checkpoint_path) \
                              .start()

Problem

  • When I used both the option cloudFiles.maxFilesPerTrigger set to 50000, I could see it is only triggering 2000 to 5000 files per batch.
  • Same is also true when I used the option cloudFiles.maxBytesPerTrigger set to 10g.
  • One record is one json file here and I monitor the numInputRows attribute for each batch to check how many files are getting processed in a batch.
    { "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40", "runId" : "746a885c-c2d8-4ec8-91c6-6c7126c85e64", "name" : null, "timestamp" : "2024-05-16T09:36:49.295Z", "batchId" : 1319, "batchDuration" : 3349, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "durationMs" : { "addBatch" : 2627, "commitOffsets" : 153, "getBatch" : 42, "latestOffset" : 352, "queryPlanning" : 9, "triggerExecution" : 3349, "walCommit" : 147 }, "stateOperators" : [ ], "sources" : [ { "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]", "startOffset" : { "seqNum" : 7835887, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "endOffset" : { "seqNum" : 7837532, "sourceVersion" : 1, "lastBackfillStartTimeMs" : 1715782085477, "lastBackfillFinishTimeMs" : 1715782478832, "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/" }, "latestOffset" : null, "numInputRows" : 952, "inputRowsPerSecond" : 161.19200812732814, "processedRowsPerSecond" : 284.2639593908629, "metrics" : { "approximateQueueSize" : "0", "numBytesOutstanding" : "0", "numFilesOutstanding" : "0" } } ], "sink" : { "description" : "ForeachBatchSink", "numOutputRows" : -1 }, "observedMetrics" : { "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : { "PipelineRunID" : "fabf2d53-29d6-4ef6-ab7d-665f99227c9c", "BatchStatus" : "Fail", "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2", "SchemaId" : 15012, "SchemaVersion" : 1, "Priority" : 10, "InputRecordCount" : 952, "OutputRecordCount" : 952 } } }

Cluster Configuration

Driver: Standard_F32s_v2 · Workers: Standard_F16s_v2 · 30-50 workers · 14.3 LTS (includes Apache Spark 3.5.0, Scala 2.12)

8 REPLIES 8

Sambit_S
New Contributor III

Hi Kaniz,

Thank you for your response.

I have read through all your points from the official documentation page.

Can you let me know how can I achieve below?

  • I have 3 million json files each of 65KB, so total size is 186GB. It is in adls gen2 storage container and the queue contains all the event notifications
  • Sambit_S_2-1716288865124.png 
  • I want to process 10GB data files per micro batch of autoloader streaming with file event notification set to true. SO total batch would come around 20.
  • What configuration cluster I have to use and what other config I have to set other than cloudFiles.maxBytesPerTrigger to 10g.
  • The readstream code is as below.
    •  Sambit_S_1-1716288754052.png

matthew_m
Databricks Employee
Databricks Employee

Hi @Sambit_S,

File notification would only impact any new arriving files, so unless you have a backfill interval to load any existing data it will not load existing data. You could also backfill initially by using directory mode and then switch to file notification for new arriving data. This can be done with the same checkpoint.

From the logs you shared it shows that there are no outstanding bytes or files, so this could indicate that the existing data is not being loaded.

Let me know if it helps.

Sambit_S
New Contributor III

Hi  matthew_m,

Please check my comments below and let me know if you find something

Sambit_S
New Contributor III

File notification would only impact any new arriving files

Yes, I have all the 3 million files as newly arriving files as I generate synthetic data files for performance testing.

From the logs you shared it shows that there are no outstanding bytes or files.

I think it was the last batch of the stream but if you can see below metric

where "numBytesOutstanding" : "2605258897", "numFilesOutstanding" : "60310" are wrong as per my assumption, because when it runs next batch the numbers increase/decrease. 

{
  "id" : "d49e1e9c-ac58-4da0-8ccb-9aa1790a7e40",
  "runId" : "6d3ac39b-42c1-4bd8-99c7-8477817b278e",
  "name" : null,
  "timestamp" : "2024-05-21T14:16:10.155Z",
  "batchId" : 1478,
  "batchDuration" : 35581,
  "numInputRows" : 998,
  "inputRowsPerSecond" : 0.0,
  "processedRowsPerSecond" : 28.04867766504595,
  "durationMs" : {
    "addBatch" : 25634,
    "commitOffsets" : 202,
    "getBatch" : 246,
    "latestOffset" : 2295,
    "queryPlanning" : 5687,
    "triggerExecution" : 35568,
    "walCommit" : 180
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "CloudFilesSource[abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/]",
    "startOffset" : {
      "seqNum" : 18875879,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1715782085477,
      "lastBackfillFinishTimeMs" : 1715782478832,
      "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/"
    },
    "endOffset" : {
      "seqNum" : 18877451,
      "sourceVersion" : 1,
      "lastBackfillStartTimeMs" : 1715782085477,
      "lastBackfillFinishTimeMs" : 1715782478832,
      "lastInputPath" : "abfss://5f540a60-11bb-4db9-9246-14bb346f1ad2@dtmsplztestscudlsdvc001.dfs.core.windows.net/data/15012/1/"
    },
    "latestOffset" : null,
    "numInputRows" : 998,
    "inputRowsPerSecond" : 0.0,
    "processedRowsPerSecond" : 28.04867766504595,
    "metrics" : {
      "approximateQueueSize" : "5869492",
      "numBytesOutstanding" : "2605258897",
      "numFilesOutstanding" : "60310"
    }
  } ],
  "sink" : {
    "description" : "ForeachBatchSink",
    "numOutputRows" : -1
  },
  "observedMetrics" : {
    "15012_5f540a60-11bb-4db9-9246-14bb346f1ad2_compress" : {
      "PipelineRunID" : "5ba763c1-4e6f-4b8e-8555-9f42935fd6a3",
      "BatchStatus" : "Fail",
      "AppId" : "5f540a60-11bb-4db9-9246-14bb346f1ad2",
      "SchemaId" : 15012,
      "SchemaVersion" : 1,
      "Priority" : 10,
      "InputRecordCount" : 998,
      "OutputRecordCount" : 998
    }
  }
}

 

matthew_m
Databricks Employee
Databricks Employee

Hi @Sambit_S ,

From that last log I can see that it ingested 998 files, which is close to the expected default max, which appears to be set in your code. maxBytesPerTrigger and maxFilesPerTrigger work together when defining the soft upper bound max.

 

Sambit_S
New Contributor III

Hi  matthew_m

I have only set maxBytesPerTrigger to 10g and if you see for the next batch metric it is something like below.

Sambit_S_0-1716303042890.png

Sambit_S
New Contributor III

Found some documentation on Azure Queue throughput which might be the reason.

Sambit_S_0-1716372928961.png

Any idea how can we maximize the throughput? Is there some spark autoloader configuration which can help increasing the streaming batch size?

matthew_m
Databricks Employee
Databricks Employee

Hi @Sambit_S ,

I misread inputRows as inputFiles which aren't the same thing. Considering the limitation on Azure queue, if you are already at the limit then you may need to consider to switching to an event source such as Kafka or Event Hub to get better ingestion performance.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group