cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Databricks OutOfMemory error on code that previously worked without issue

shanebo425
New Contributor III

I have a notebook in Azure Databricks that does some transformations on a bronze tier table and inserts the transformed data into a silver tier table. This notebook is used to do an initial load of the data from our existing system into our new datalake. The section at issue converts a list of columns from a bronze table and inserts them as rows into a silver table. I've been running this code for weeks without issue: same code, same data, same cluster configuration (no changes). Suddenly, I am now consistently receiving an `OutOfMemory` error when I execute this particular section of the notebook. The data is not large, and I'm only inserting a dataframe with 6 columns.

I've tried numerous approaches (both code and server configuration) to fix the issue with no luck. Hoping someone can help me out. The original source code is below.


```
dfCallData = fetchCallSummaryData(useArchive=False) \
.join(_callMap, on=(col('UniqueCallID') == _callMap.systemCallId) & (_callMap.systemType != lit('NOVA')), how='inner') \
.select(col('UniqueCallId'), col('tenantId'), col('agentId'), col('channelType'), col('callerPhone'), col('callerName'), col('terminationType'),
col('callDuration'), col('transferDuration'), col('callOutcome'), col('transferReason'), col('endEvent'), col('lastStepName'), col('reportDate'),
col('callInitiatedOn'), col('lastSystemUpdateOn'), col('isFirstTimeCaller'), col('isAfterHours'), col('finalStatus'), col('clk.isArchived'),
col('clk.archivedOn'), col('Conversation'), col('QueueDuration'), col('AgentDuration'), col('TransferTalkDuration'), col('WhisperDuration'),
col('BillingRatePerIVRMinute'), col('BillingRatePerTransferMinute'), col('BillingRatePerCall'), col('BillingGracePeriod'), col('CustomerData'),
col('CustomerDataFlattened'), col('RunMode'), col('SurveyResults'), col('RedactedConversation'), col('Disposition'), col('Col1'), col('Col2'),
col('Col3'), col('Col4'), col('Col5'), col('Col6'), col('Col7'), col('Col8'), col('Pub1'), col('Pub2'), col('Pub3'), col('Pub4'), col('Pub5'),
col('Pub6'), col('Pub7'), col('Pub8'), col('PubBreadCrumbs'), col('PubFlattened'), col('PubLastBreadCrumb'), col('PubIntent'),
col('PubAuthenticated'), col('Transcript'), col('clk.callId'))

pub_cols = ['Pub1', 'Pub2', 'Pub3', 'Pub4', 'Pub5', 'Pub6', 'Pub7', 'Pub8']

# Original method signature
def processMappedCallData(columns_to_convert:list) -> DataFrame:
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))

dfData = dfData.union(df) if dfData is not None else df

return dfData

dfPubCols = processMappedCallData(pub_cols)
_pipeline.execute_call_data_pipeline(dfPubCols, callDataType='columns')


# Upsert
def execute_call_data_pipeline(self, dfMappedData:DataFrame, callDataType='columns:mapped'):
dtCallData = DeltaTable.forName(self._spark, f'{self.get_catalog()}.{self.get_schema()}.call_data')
dtCallData.alias('old').merge(
source=dfMappedData.alias('new'),
condition=expr('old.callDataId = new.callDataId')
).whenMatchedUpdate(set=
{
'callId': col('new.callId') if 'callId' in dfMappedData.columns else col('old.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else col('old.callDataMapId'),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else col('old.callDataType'),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else col('old.legacyColumn'),
'dataValue': col('new.dataValue') if 'dataValue' in dfMappedData.columns else col('old.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else col('old.isEncrypted'),
'silverUpdateOn': lit(datetime.now(timezone.utc).timestamp())
}
).whenNotMatchedInsert(values=
{
'callId': col('new.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else lit(None),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else lit(callDataType),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else lit(None),
'dataValue': col('new.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else lit(False),
'silverCreateOn': lit(datetime.now(timezone.utc).timestamp())
}
).execute()

```

As one example of the changes I made, here is a change to the `processMappedCallData()` method to break the data into multiple upsert calls in smaller chunks of ~300M rows rather than one large dataframe of ~2.4B rows. Both the original and this one failed. The error is always the same: `java.lang.OutOfMemoryError: Java heap space`

```
def processMappedCallData(columns_to_convert:list):
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')

dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')

dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))
_pipeline.execute_call_data_pipeline(df, callDataType='columns')
```

1 REPLY 1

jose_gonzalez
Moderator
Moderator

Please review your Spark UI from the old job execution versus the new job execution. You might need to check if the data volume has increase and that could be the reason of the OOM

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!