DLT Pipeline OOM issue
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-14-2025 04:04 AM - edited 03-14-2025 04:16 AM
Hi ,
I am getting performance issues in one of my pipeline which is taking 5hour to run even for no data where it was taking 1hour earlier. It seems as the volume of the source grows it keeps degrading the performance. I am having below setup.
- Source is parquet file folder and contains all parquet from initial days up to today. So i have more than 10m files in the source.
- Each parquet file is a single set of data that means for some it contains 1 row for some multiple rows.
- We are getting the parquet files from the downstream system to our source folder.
- We have a DLT pipeline that ingest data from the source to streaming table.
- We are loading around 5 tables in a single pipeline. Each table reads data from it's corresponding parquet source.
- Each parquet file is not even more than 20KB.
- Cluster details is 1 Driver (64GB) with 4 worker nodes (64GB) (8 Core) each.
- Code:-
import dlt
for source in sources:
@dlt.table(name = source.target_name)
def parquet_target():
return (spark.readStream .format("cloudFiles") .option("cloudFiles.format","parquet") .option('cloudFiles.backfillInterval', "1 day") .option('cloudFiles.includeExistingFiles', True) .option('cloudFiles.inferColumnTypes', False) .option('cloudFiles.maxFilesPerTrigger', 1000) .option('cloudFiles.partitionColumns', []) .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .option("cloudFiles.schemaHints", source.target_schema).load(source.path))
I checked the Spark UI and it seems the time is taking during the initialization of data bricks.
Also If we process more files then it fails with memory exception.
Executor got terminated abnormally due to OUT_OF_MEMORY.
java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuffer.append(StringBuffer.java:276) at java.net.URI.appendAuthority(URI.java:1897) at java.net.URI.appendSchemeSpecificPart(URI.java:1935) at java.net.URI.toString(URI.java:1967) at java.net.URI.(URI.java:742) at org.apache.hadoop.fs.Path.makeQualified(Path.java:575) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1383) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1309) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1267) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:735) at com.databricks.common.filesystem.LokiFileSystem.listStatus(LokiFileSystem.scala:306) at com.databricks.sql.acl.fs.volumes.UCVolumesFileSystem.listStatus(UCVolumesFileSystem.scala:231) at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$2(DatabricksFileSystemV2.scala:350) at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3874/209720936.apply(Unknown Source) at com.databricks.s3a.S3AExceptionUtils$.convertAWSExceptionToJavaIOException(DatabricksStreamUtils.scala:66) at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$1(DatabricksFileSystemV2.scala:306) at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3869/1234745603.apply(Unknown Source) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:528) at com.databricks.logging.UsageLogging$$Lambda$2797/1369252947.apply(Unknown Source) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:633) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:656) at com.databricks.logging.UsageLogging$$Lambda$2800/846620173.apply(Unknown Source) at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48) at com.databricks.logging.AttributionContextTracing$$Lambda$1061/2048906772.apply(Unknown Source) at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276) at com.databricks.logging.AttributionContext$$$Lambda$1062/873554165.apply(Unknown Source) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272) at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
03-16-2025 09:48 PM
Hi BricksGuy,
How are you doing today?, As per my understanding, It looks like your pipeline is slowing down because it's processing too many small parquet files—over 10 million—which is causing high metadata overhead and memory issues. Since Spark has to list and open each file, performance degrades as the file count grows. To fix this, you could compact small files before ingestion using the OPTIMIZE command (if using Delta) or a Spark job to merge them. Also, try increasing cloudFiles.maxFilesPerTrigger (e.g., from 1000 to 5000) to process more files per batch without overwhelming memory. Partitioning your source data by date or another key can also help reduce file scanning time. Since you're getting GC overhead limit exceeded errors, you might need to increase cluster memory or tune memory overhead settings. Enabling Photon could also speed up processing. The key fix here is to reduce the number of small files—let me know if you need help setting that up!
Regards,
Brahma

