cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks OutOfMemory error on code that previously worked without issue

shanebo425
New Contributor II

I have a notebook in Azure Databricks that does some transformations on a bronze tier table and inserts the transformed data into a silver tier table. This notebook is used to do an initial load of the data from our existing system into our new datalake. The section at issue converts a list of columns from a bronze table and inserts them as rows into a silver table. I've been running this code for weeks without issue: same code, same data, same cluster configuration (no changes). Suddenly, I am now consistently receiving an `OutOfMemory` error when I execute this particular section of the notebook. The data is not large, and I'm only inserting a dataframe with 6 columns.

I've tried numerous approaches (both code and server configuration) to fix the issue with no luck. Hoping someone can help me out. The original source code is below.


```
dfCallData = fetchCallSummaryData(useArchive=False) \
.join(_callMap, on=(col('UniqueCallID') == _callMap.systemCallId) & (_callMap.systemType != lit('NOVA')), how='inner') \
.select(col('UniqueCallId'), col('tenantId'), col('agentId'), col('channelType'), col('callerPhone'), col('callerName'), col('terminationType'),
col('callDuration'), col('transferDuration'), col('callOutcome'), col('transferReason'), col('endEvent'), col('lastStepName'), col('reportDate'),
col('callInitiatedOn'), col('lastSystemUpdateOn'), col('isFirstTimeCaller'), col('isAfterHours'), col('finalStatus'), col('clk.isArchived'),
col('clk.archivedOn'), col('Conversation'), col('QueueDuration'), col('AgentDuration'), col('TransferTalkDuration'), col('WhisperDuration'),
col('BillingRatePerIVRMinute'), col('BillingRatePerTransferMinute'), col('BillingRatePerCall'), col('BillingGracePeriod'), col('CustomerData'),
col('CustomerDataFlattened'), col('RunMode'), col('SurveyResults'), col('RedactedConversation'), col('Disposition'), col('Col1'), col('Col2'),
col('Col3'), col('Col4'), col('Col5'), col('Col6'), col('Col7'), col('Col8'), col('Pub1'), col('Pub2'), col('Pub3'), col('Pub4'), col('Pub5'),
col('Pub6'), col('Pub7'), col('Pub8'), col('PubBreadCrumbs'), col('PubFlattened'), col('PubLastBreadCrumb'), col('PubIntent'),
col('PubAuthenticated'), col('Transcript'), col('clk.callId'))

pub_cols = ['Pub1', 'Pub2', 'Pub3', 'Pub4', 'Pub5', 'Pub6', 'Pub7', 'Pub8']

# Original method signature
def processMappedCallData(columns_to_convert:list) -> DataFrame:
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))

dfData = dfData.union(df) if dfData is not None else df

return dfData

dfPubCols = processMappedCallData(pub_cols)
_pipeline.execute_call_data_pipeline(dfPubCols, callDataType='columns')


# Upsert
def execute_call_data_pipeline(self, dfMappedData:DataFrame, callDataType='columns:mapped'):
dtCallData = DeltaTable.forName(self._spark, f'{self.get_catalog()}.{self.get_schema()}.call_data')
dtCallData.alias('old').merge(
source=dfMappedData.alias('new'),
condition=expr('old.callDataId = new.callDataId')
).whenMatchedUpdate(set=
{
'callId': col('new.callId') if 'callId' in dfMappedData.columns else col('old.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else col('old.callDataMapId'),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else col('old.callDataType'),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else col('old.legacyColumn'),
'dataValue': col('new.dataValue') if 'dataValue' in dfMappedData.columns else col('old.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else col('old.isEncrypted'),
'silverUpdateOn': lit(datetime.now(timezone.utc).timestamp())
}
).whenNotMatchedInsert(values=
{
'callId': col('new.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else lit(None),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else lit(callDataType),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else lit(None),
'dataValue': col('new.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else lit(False),
'silverCreateOn': lit(datetime.now(timezone.utc).timestamp())
}
).execute()

```

As one example of the changes I made, here is a change to the `processMappedCallData()` method to break the data into multiple upsert calls in smaller chunks of ~300M rows rather than one large dataframe of ~2.4B rows. Both the original and this one failed. The error is always the same: `java.lang.OutOfMemoryError: Java heap space`

```
def processMappedCallData(columns_to_convert:list):
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))
_pipeline.execute_call_data_pipeline(df, callDataType='columns')
```

1 REPLY 1

jose_gonzalez
Moderator
Moderator

Please review your Spark UI from the old job execution versus the new job execution. You might need to check if the data volume has increase and that could be the reason of the OOM

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.