cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Driver Crash on processing large dataframe

desertstorm
New Contributor II

I have a dataframe with abt 2 million text rows (1gb). I partition it into about 700 parititons as thats the no of cores available on my cluster exceutors. I run the transformations extracting medical information and then write the results in parquet on S3. The process runs for 3 hrs and then crashes. The driver crashed with following error.

The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached.

I have tried driver with both 128gb and 256 memory but end up in same result. Also i have used the persist option with similar crashes.

df.persist(StorageLevel.MEMORY_AND_DISK)
Would apprecaite any help from community on how to solve this.

 

8 REPLIES 8

Lakshay
Databricks Employee
Databricks Employee

Hi @desertstorm , The error "The spark driver has stopped unexpectedly and is restarting. Your notebook will be automatically reattached." usually happens when the driver is under memory pressure. This means that there is a piece of code that is executing on the driver and not on executors. We need to identify and remove that piece if code. 

Here are some general suggestions to watch out:-

1. If your code has display or collect operations, you should remove that.

2. If your code has python piece of code, you need to replace that with pyspark.

desertstorm
New Contributor II

Hi @Lakshay Thanks so much for your reply. I have looked into most of those options and dont see any python code. Its mostly pipeline.transform. Here is the code where it crashes. I feel it should not bring to the driver either for with column or for writing to parquet. so not sure whats wrong. Happy to share the file as well 


from pyspark import StorageLevel
from pyspark.sql.functions import lower, col
import datetime
df = df_joined.repartition(768)

df.persist(StorageLevel.MEMORY_AND_DISK)

# Get the current date and time
formatted_date_time = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
print(formatted_date_time)

# Process rxnorm results
rxnorm_result = (
    process_rxnorm_results(
        rxnorm_pipelineModel.transform(
            process_posology_relations(
                model_posology.transform(df)
            )
        )
    )
    .withColumn('drug_resolution', lower(col('resolutions')[0]))
    .withColumn('chunk1', lower(col('chunk1')))
    .withColumn('chunk2', lower(col('chunk2')))
)
rxnorm_result.persist(StorageLevel.MEMORY_AND_DISK)
outname_rx_results = f'{rx_norm_path}/rx_norm/rxnorm_part_{formatted_date_time}.parquet'
rxnorm_result.write.format("parquet").save(outname_rx_results)
df.unpersist()
rxnorm_result.unpersist()

Lakshay
Databricks Employee
Databricks Employee

Hi @desertstorm , I think the issue is with "Process rxnorm results" part of the code. You can try to comment out that part to confirm if that is correct.

jose_gonzalez
Databricks Employee
Databricks Employee

just wondering where the magic number "768" in your repartition is coming from? how big is your cluster? what about your driver instance?

thats the no of cores available on executors. i have tried driver with 256 gb as well as 128gb with same results

jose_gonzalez
Databricks Employee
Databricks Employee

can you try to split the data?

do you have any collect() or any other driver's heavy actions that could cause this error in the driver?

Svish
New Contributor III

I am encountering the same issue. My dataframe is about 7 million rows. I tried reducing the dataframe size. Anything over a million rows, the write operation doesn't finish and I see the driver error. 

response=requests.post(url=url,auth=credentials,headers=headers,data=payload)
  status=response.status_code
  if status==200:
    r=response.json()
    try:
      data=((r['data']))
      project=data[nextTableName]
      df="df_"+nextKey
      df=pd.DataFrame(project)
      dfarry.append(df)
      pagination=data['pagination']
      for p in pagination:
          nextTableName=p['nextTableName']
          print(nextTableName)
          nextKey=p['nextKey']
          print(nextKey)
    except  ChunkedEncodingError:
      print("ChunkedEncodingError")
      if retry<3:
        retry=retry+1
        continue
      else:
        print("Too many retries")
        break
  else:
      print("Call Failed")
      print(status)
      break
else:
    nextTableName="ACTIVITY"
    Vname="tempActivity"
    nextKey="1"
    if len(dfarry)>1:
        loadDf=dfarry[0]
        loadDF=spark.createDataFrame(loadDf)
        dfarry=dfarry[1:]
        for a in dfarry:
            A=spark.createDataFrame(a)
            loadDF=loadDF.unionByName(A, allowMissingColumns=True)
    else:
        loadDf=dfarry[0]
        loadDF=spark.createDataFrame(loadDf)
    ## convert pandas dataframe to spark dataframe for batch load
    ##loadDF=spark.createDataFrame(loadDf)
    print("loadDF_ready")
    loadDF= loadDF.withColumn('Load_TimeStamp',current_timestamp()).withColumn ('Source_Table',lit(nextTableName)).withColumn('ConfigCode',lit('ds_reportuser'))
    spark.sql("DROP VIEW IF EXISTS "+Vname);
    # Drop the table if it exists. # Create the table if it does not exist and define the schema
    spark.sql("DROP TABLE IF EXISTS P6Activity_V5");
    loadDF.write.mode("append").saveAsTable("P6Activity_V5")
    print(f"Table FROM P6Activity_V5 created")

Isi
Contributor

Hey @Svish ,

Your problem is probably caused by using Pandas. Pandas loads all the data into the driver memory, which is likely why you are experiencing issues. If you can modify your code to use Spark instead, you will probably avoid this problem.

However, if switching to PySpark is not an option, I recommend increasing the driver size to handle the larger data load.

🙂