cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline OOM issue

BricksGuy
New Contributor III

Hi ,
I am getting performance issues in one of my pipeline which is taking 5hour to run even for no data where it was taking 1hour earlier. It seems as the volume of the source grows it keeps degrading the performance. I am having below setup.

  1. Source is parquet file folder and contains all parquet from initial days up to today. So i have more than 10m files in the source.
  2. Each parquet file is a single set of data that means for some it contains 1 row for some multiple rows.
  3. We are getting the parquet files from the downstream system to our source folder.
  4. We have a DLT pipeline that ingest data from the source to streaming table.
  5. We are loading around 5 tables in a single pipeline. Each table reads data from it's corresponding parquet source.
  6. Each parquet file is not even more than 20KB.
  7. Cluster details is 1 Driver (64GB) with 4 worker nodes (64GB) (8 Core) each.
  8. Code:-
import dlt 
for source in sources: 
@dlt.table(name = source.target_name) 
def parquet_target(): 
return (spark.readStream .format("cloudFiles") .option("cloudFiles.format","parquet") .option('cloudFiles.backfillInterval', "1 day") .option('cloudFiles.includeExistingFiles', True) .option('cloudFiles.inferColumnTypes', False) .option('cloudFiles.maxFilesPerTrigger', 1000) .option('cloudFiles.partitionColumns', []) .option("cloudFiles.schemaEvolutionMode", "addNewColumns") .option("cloudFiles.schemaHints", source.target_schema).load(source.path))

I checked the Spark UI and it seems the time is taking during the initialization of data bricks. 
Also If we process more files then it fails with memory exception. 

Executor got terminated abnormally due to OUT_OF_MEMORY. 

java.lang.OutOfMemoryError: GC overhead limit exceeded
	at java.util.Arrays.copyOf(Arrays.java:3332)
	at java.lang.AbstractStringBuilder.ensureCapacityInternal(AbstractStringBuilder.java:124)
	at java.lang.AbstractStringBuilder.append(AbstractStringBuilder.java:448)
	at java.lang.StringBuffer.append(StringBuffer.java:276)
	at java.net.URI.appendAuthority(URI.java:1897)
	at java.net.URI.appendSchemeSpecificPart(URI.java:1935)
	at java.net.URI.toString(URI.java:1967)
	at java.net.URI.(URI.java:742)
	at org.apache.hadoop.fs.Path.makeQualified(Path.java:575)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1383)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1309)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.listStatus(AzureBlobFileSystemStore.java:1267)
	at shaded.databricks.azurebfs.org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem.listStatus(AzureBlobFileSystem.java:735)
	at com.databricks.common.filesystem.LokiFileSystem.listStatus(LokiFileSystem.scala:306)
	at com.databricks.sql.acl.fs.volumes.UCVolumesFileSystem.listStatus(UCVolumesFileSystem.scala:231)
	at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$2(DatabricksFileSystemV2.scala:350)
	at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3874/209720936.apply(Unknown Source)
	at com.databricks.s3a.S3AExceptionUtils$.convertAWSExceptionToJavaIOException(DatabricksStreamUtils.scala:66)
	at com.databricks.backend.daemon.data.client.DBFSV2.$anonfun$listStatusAsIterator$1(DatabricksFileSystemV2.scala:306)
	at com.databricks.backend.daemon.data.client.DBFSV2$$Lambda$3869/1234745603.apply(Unknown Source)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperation$1(UsageLogging.scala:528)
	at com.databricks.logging.UsageLogging$$Lambda$2797/1369252947.apply(Unknown Source)
	at com.databricks.logging.UsageLogging.executeThunkAndCaptureResultTags$1(UsageLogging.scala:633)
	at com.databricks.logging.UsageLogging.$anonfun$recordOperationWithResultTags$4(UsageLogging.scala:656)
	at com.databricks.logging.UsageLogging$$Lambda$2800/846620173.apply(Unknown Source)
	at com.databricks.logging.AttributionContextTracing.$anonfun$withAttributionContext$1(AttributionContextTracing.scala:48)
	at com.databricks.logging.AttributionContextTracing$$Lambda$1061/2048906772.apply(Unknown Source)
	at com.databricks.logging.AttributionContext$.$anonfun$withValue$1(AttributionContext.scala:276)
	at com.databricks.logging.AttributionContext$$$Lambda$1062/873554165.apply(Unknown Source)
	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
	at com.databricks.logging.AttributionContext$.withValue(AttributionContext.scala:272)
	at com.databricks.logging.AttributionContextTracing.withAttributionContext(AttributionContextTracing.scala:46)

 

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group