I have a notebook in Azure Databricks that does some transformations on a bronze tier table and inserts the transformed data into a silver tier table. This notebook is used to do an initial load of the data from our existing system into our new datalake. The section at issue converts a list of columns from a bronze table and inserts them as rows into a silver table. I've been running this code for weeks without issue: same code, same data, same cluster configuration (no changes). Suddenly, I am now consistently receiving an `OutOfMemory` error when I execute this particular section of the notebook. The data is not large, and I'm only inserting a dataframe with 6 columns.
I've tried numerous approaches (both code and server configuration) to fix the issue with no luck. Hoping someone can help me out. The original source code is below.
```
dfCallData = fetchCallSummaryData(useArchive=False) \
.join(_callMap, on=(col('UniqueCallID') == _callMap.systemCallId) & (_callMap.systemType != lit('NOVA')), how='inner') \
.select(col('UniqueCallId'), col('tenantId'), col('agentId'), col('channelType'), col('callerPhone'), col('callerName'), col('terminationType'),
col('callDuration'), col('transferDuration'), col('callOutcome'), col('transferReason'), col('endEvent'), col('lastStepName'), col('reportDate'),
col('callInitiatedOn'), col('lastSystemUpdateOn'), col('isFirstTimeCaller'), col('isAfterHours'), col('finalStatus'), col('clk.isArchived'),
col('clk.archivedOn'), col('Conversation'), col('QueueDuration'), col('AgentDuration'), col('TransferTalkDuration'), col('WhisperDuration'),
col('BillingRatePerIVRMinute'), col('BillingRatePerTransferMinute'), col('BillingRatePerCall'), col('BillingGracePeriod'), col('CustomerData'),
col('CustomerDataFlattened'), col('RunMode'), col('SurveyResults'), col('RedactedConversation'), col('Disposition'), col('Col1'), col('Col2'),
col('Col3'), col('Col4'), col('Col5'), col('Col6'), col('Col7'), col('Col8'), col('Pub1'), col('Pub2'), col('Pub3'), col('Pub4'), col('Pub5'),
col('Pub6'), col('Pub7'), col('Pub8'), col('PubBreadCrumbs'), col('PubFlattened'), col('PubLastBreadCrumb'), col('PubIntent'),
col('PubAuthenticated'), col('Transcript'), col('clk.callId'))
pub_cols = ['Pub1', 'Pub2', 'Pub3', 'Pub4', 'Pub5', 'Pub6', 'Pub7', 'Pub8']
# Original method signature
def processMappedCallData(columns_to_convert:list) -> DataFrame:
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')
dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')
dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))
dfData = dfData.union(df) if dfData is not None else df
return dfData
dfPubCols = processMappedCallData(pub_cols)
_pipeline.execute_call_data_pipeline(dfPubCols, callDataType='columns')
# Upsert
def execute_call_data_pipeline(self, dfMappedData:DataFrame, callDataType='columns:mapped'):
dtCallData = DeltaTable.forName(self._spark, f'{self.get_catalog()}.{self.get_schema()}.call_data')
dtCallData.alias('old').merge(
source=dfMappedData.alias('new'),
condition=expr('old.callDataId = new.callDataId')
).whenMatchedUpdate(set=
{
'callId': col('new.callId') if 'callId' in dfMappedData.columns else col('old.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else col('old.callDataMapId'),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else col('old.callDataType'),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else col('old.legacyColumn'),
'dataValue': col('new.dataValue') if 'dataValue' in dfMappedData.columns else col('old.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else col('old.isEncrypted'),
'silverUpdateOn': lit(datetime.now(timezone.utc).timestamp())
}
).whenNotMatchedInsert(values=
{
'callId': col('new.callId'),
'callDataMapId': col('new.callDataMapId') if 'callDataMapId' in dfMappedData.columns else lit(None),
'callDataType': col('new.callDataType') if 'callDataType' in dfMappedData.columns else lit(callDataType),
'legacyColumn': col('new.legacyColumn') if 'legacyColumn' in dfMappedData.columns else lit(None),
'dataValue': col('new.dataValue'),
'isEncrypted': col('new.isEncrypted') if 'isEncrypted' in dfMappedData.columns else lit(False),
'silverCreateOn': lit(datetime.now(timezone.utc).timestamp())
}
).execute()
```
As one example of the changes I made, here is a change to the `processMappedCallData()` method to break the data into multiple upsert calls in smaller chunks of ~300M rows rather than one large dataframe of ~2.4B rows. Both the original and this one failed. The error is always the same: `java.lang.OutOfMemoryError: Java heap space`
```
def processMappedCallData(columns_to_convert:list):
dfNewMap = spark.read.table('silver.appdata.call_data_map') \
.where(col('mapType') == 'columns') \
.alias('dm')
dfCallDataSub = dfCallData.select('callId', 'UniqueCallId', col('agentId'), *columns_to_convert) \
.alias('cd')
dfData = None
for c in columns_to_convert:
df = dfCallDataSub.join(dfNewMap, (col('cd.agentId') == col('dm.agentId')) & (col('dm.mapKey') == c), 'inner') \
.where((col(f'cd.{c}').isNotNull()) & (col(f'cd.{c}') != '')) \
.withColumn('callDataId', lit(None)) \
.withColumn('callDataType', lit('columns:mapped')) \
.select('callDataId', 'cd.callId', 'dm.callDataMapId', 'callDataType', lit(c).alias('legacyColumn'),
col(f'cd.{c}').alias('dataValue'))
_pipeline.execute_call_data_pipeline(df, callDataType='columns')
```