DLT Pipeline OOM issue
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
yesterday - last edited yesterday
Hi ,
I am getting performance issues in one of my pipeline which is taking 5hour to run even for no data where it was taking 1hour earlier. It seems as the volume of the source grows it keeps degrading the performance. I am having below setup.
- Source is parquet file folder and contains all parquet from initial days up to today. So i have more than 10m files in the source.
- Each parquet file is a single set of data that means for some it contains 1 row for some multiple rows.
- We are getting the parquet files from the downstream system to our source folder.
- We have a DLT pipeline that ingest data from the source to streaming table.
- We are loading around 5 tables in a single pipeline. Each table reads data from it's corresponding parquet source.
- Each parquet file is not even more than 20KB.
- Cluster details is 1 Driver (64GB) with 4 worker nodes (64GB) (8 Core) each.
- Code:-
import dlt
for source in sources:
@dlt.table(name = source.target_name)
def parquet_target():
return (spark.readStream .format("cloudFiles") .option("cloudFiles.format","parquet") .option('cloudFiles.backfillInterval', "1 day") .option('cloudFiles.includeExistingFiles', True) .option('cloudFiles.inferColumnTypes', False) .option('cloudFiles.maxFilesPerTrigger', 1000) .option('cloudFiles.partitionColumns', []) .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .option("cloudFiles.schemaHints", source.target_schema).load(source.path))
I checked the Spark UI and it seems the time is taking during the initialization of data bricks.
Also If we process more files then it fails with memory exception.
Executor got terminated abnormally due to OUT_OF_MEMORY.
java.lang.OutOfMemoryError: GC overhead limit exceeded at java.util.Arrays.copyOf(Arrays.java:3332) at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124) at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448) at java.lang.StringBuffer.append(StringBuffer.java:276) at java.net.URI.appendAuthority(URI.java:1897) at java.net.URI.appendSchemeSpecificPart(URI.java:1935) at java.net.URI.toString(URI.java:1967) at java.net.URI.(URI.java:742) at org.apache.hadoop.fs.Path.makeQualified(Path.java:575) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1383) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1309) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1267) at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:735) at com.databricks.common.filesystem.LokiFileSystem.listStatus(LokiFileSystem.scala:306) at com.databricks.sql.acl.fs.volumes.UCVolumesFileSystem.listStatus(UCVolumesFileSystem.scala:231) at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$2(DatabricksFileSystemV2.scala:350) at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3874/209720936.apply(Unknown Source) at com.databricks.s3a.S3AExceptionUtils$.convertAWSExceptionToJavaIOException(DatabricksStreamUtils.scala:66) at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$1(DatabricksFileSystemV2.scala:306) at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3869/1234745603.apply(Unknown Source) at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:528) at com.databricks.logging.UsageLogging$$Lambda$2797/1369252947.apply(Unknown Source) at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:633) at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:656) at com.databricks.logging.UsageLogging$$Lambda$2800/846620173.apply(Unknown Source) at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48) at com.databricks.logging.AttributionContextTracing$$Lambda$1061/2048906772.apply(Unknown Source) at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276) at com.databricks.logging.AttributionContext$$$Lambda$1062/873554165.apply(Unknown Source) at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62) at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272) at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)
0 REPLIES 0

