cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks OutOfMemory error on code that previously worked without issue

shanebo425
New Contributor III

I have a notebook in Azure Databricks that does some transformations on a bronze tier table and inserts the transformed data into a silver tier table. This notebook is used to do an initial load of the data from our existing system into our new datalake. The section at issue converts a list of columns from a bronze table and inserts them as rows into a silver table. I've been running this code for weeks without issue: same code, same data, same cluster configuration (no changes). Suddenly, I am now consistently receiving an `OutOfMemory` error when I execute this particular section of the notebook. The data is not large, and I'm only inserting a dataframe with 6 columns.

I've tried numerous approaches (both code and server configuration) to fix the issue with no luck. Hoping someone can help me out. The original source code is below.


```
dfCallData = fetchCallSummaryData(useArchive=False) \
.join(_callMap, on=(col('UniqueCallID') == _callMap.systemCallId) & (_callMap.systemType != lit('NOVA')), how='inner') \
.select(col('UniqueCallId'), col('tenantId'), col('agentId'), col('channelType'), col('callerPhone'), col('callerName'), col('terminationType'),
col('callDuration'), col('transferDuration'), col('callOutcome'), col('transferReason'), col('endEvent'), col('lastStepName'), col('reportDate'),
col('callInitiatedOn'), col('lastSystemUpdateOn'), col('isFirstTimeCaller'), col('isAfterHours'), col('finalStatus'), col('clk.isArchived'),
col('clk.archivedOn'), col('Conversation'), col('QueueDuration'), col('AgentDuration'), col('TransferTalkDuration'), col('WhisperDuration'),
col('BillingRatePerIVRMinute'), col('BillingRatePerTransferMinute'), col('BillingRatePerCall'), col('BillingGracePeriod'), col('CustomerData'),
col('CustomerDataFlattened'), col('RunMode'), col('SurveyResults'), col('RedactedConversation'), col('Disposition'), col('Col1'), col('Col2'),
col('Col3'), col('Col4'), col('Col5'), col('Col6'), col('Col7'), col('Col8'), col('Pub1'), col('Pub2'), col('Pub3'), col('Pub4'), col('Pub5'),
col('Pub6'), col('Pub7'), col('Pub8'), col('PubBreadCrumbs'), col('PubFlattened'), col('PubLastBreadCrumb'), col('PubIntent'),
col('PubAuthenticated'), col('Transcript'), col('clk.callId'))

pub_cols = ['Pub1', 'Pub2', 'Pub3', 'Pub4', 'Pub5', 'Pub6', 'Pub7', 'Pub8']

# Original method signature
def processMappedCallData(columns_to_convert:list) -> DataFrame:
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))

dfData = dfData.union(df) if dfData is not None else df

return dfData

dfPubCols = processMappedCallData(pub_cols)
_pipeline.execute_call_data_pipeline(dfPubCols, callDataType='columns')


# Upsert
def execute_call_data_pipeline(self, dfMappedData:DataFrame, callDataType='columns:mapped'):
dtCallData = DeltaTable.forName(self._spark, f'{self.get_catalog()}.{self.get_schema()}.call_data')
dtCallData.alias('old').merge(
source=dfMappedData.alias('new'),
condition=expr('old.callDataId = new.callDataId')
).whenMatchedUpdate(set=
{
'callId': col('new.callId') if 'callId' in dfMappedData.columns else col('old.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else col('old.callDataMapId'),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else col('old.callDataType'),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else col('old.legacyColumn'),
'dataValue': col('new.dataValue') if 'dataValue' in dfMappedData.columns else col('old.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else col('old.isEncrypted'),
'silverUpdateOn': lit(datetime.now(timezone.utc).timestamp())
}
).whenNotMatchedInsert(values=
{
'callId': col('new.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else lit(None),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else lit(callDataType),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else lit(None),
'dataValue': col('new.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else lit(False),
'silverCreateOn': lit(datetime.now(timezone.utc).timestamp())
}
).execute()

```

As one example of the changes I made, here is a change to the `processMappedCallData()` method to break the data into multiple upsert calls in smaller chunks of ~300M rows rather than one large dataframe of ~2.4B rows. Both the original and this one failed. The error is always the same: `java.lang.OutOfMemoryError: Java heap space`

```
def processMappedCallData(columns_to_convert:list):
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))
_pipeline.execute_call_data_pipeline(df, callDataType='columns')
```

1 REPLY 1

jose_gonzalez
Moderator
Moderator

Please review your Spark UI from the old job execution versus the new job execution. You might need to check if the data volume has increase and that could be the reason of the OOM

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group