cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Optimizing .collect() Usage in Spark

jeremy98
Contributor III

Hi all!

I'm facing an issue with driver memory after deploying a cluster with 14GB of memory. My code utilizes the clusterโ€™s compute power continuously (it never shuts down, as I cannot communicate with the Azure PostgreSQL database otherwise at the moment). While reviewing my code, I noticed that some parts use .collect() to retrieve a Spark DataFrame as a list of rows.

Since I need to import the data row by row, I'm looking for an alternative approach that avoids .collect() while achieving the same result efficiently.

Hereโ€™s the current (inefficient) code

 

if num_rows > 0:
    delete_data = [tuple(row) for row in records_to_delete_df.collect()]
    delete_query = syncer._generate_delete_statement(table_name, info_logic['primary_keys'])

 

The _generate_delete_statement function returns a DELETE SQL statement, as shown below:

 

def _generate_delete_statement(self, table_name: str, primary_keys: str) -> str:
    """Generate DELETE SQL statement."""
    columns = [col.strip() for col in primary_keys.split(",")]
    where_conditions = " AND ".join([f"{col} = %s" for col in columns])
    return f"""DELETE FROM {table_name} WHERE {where_conditions};"""

 

 

 

Is there a way to avoid using .collect() while maintaining the same functionality?

Thanks in advance!

7 REPLIES 7

jeremy98
Contributor III

uppp plz

cgrant
Databricks Employee
Databricks Employee

If you are deleting from a Delta Lake table, a more scalable strategy would be to write your records_to_delete_df to a table, and then use the MERGE command to delete where you have matches.

MERGE INTO {target_table} target
USING records_to_delete source
ON source.{col} = target.{col} -- add more columns here, AND them together
WHEN MATCHED THEN DELETE 

If you're instead deleting from Postgres, you'd use the same strategy, the syntax would likely be a bit different

jeremy98
Contributor III

 

Hello,

Thanks for your response. Yes, I need to sync Databricks with PostgreSQL. I was using psycopg2 for this purpose:

 

 

   def _execute_dml(self, query: str, data: List[tuple], connection_properties: Dict, operation: str, batch_size: int = 1000) -> None:
        """
        Execute DML operations using psycopg2.extras.execute_batch for better performance.
        
        Args:
            query: SQL query to execute
            data: List of parameter tuples
            connection_properties: Database connection details
            operation: Type of operation (INSERT, UPDATE, DELETE)
            batch_size: Number of rows per batch (default: 1000)
        """

        conn = psycopg2.connect(
            dbname=connection_properties['db_name'],
            user=connection_properties['db_username'],
            password=connection_properties['db_password'],
            host=connection_properties['host'],
            port=connection_properties['port']
        )
        
        with conn:
            with conn.cursor() as cursor:
                print(f"Executing {operation} operation in batches of {batch_size}")
                extras.execute_batch(cursor, query, data, page_size=batch_size)
                affected_rows = cursor.rowcount
            print(f"Successfully executed {operation} operation. Affected rows: {affected_rows}")

        conn.close()

 

 

However, the data needs to be passed as a list of tuples. Since I was using a Spark DataFrame, I converted it using .collect(), but this can cause an out-of-memory (OOM) issue after about 10 days of operation.

Since we are using a cluster that stays up continuously, is there any way to clean up the driver memory after every finished job that uses this cluster?

Thanks in advance for your help!





cgrant
Databricks Employee
Databricks Employee

Data from collect() will automatically be garbage collected after it is out of scope. However, it's not recommended for larger data. Instead, you can

  • Write to a staging table in Postgres via Spark's JDBC connector, then issue a command via JDBC that performs the delete between the staging table and your target. On the Spark side, this operation is distributed among the worker nodes with much less memory usage on the driver.
  • If you must use the Spark driver to perform this, try using toLocalIterator() instead of collect(), which avoids memory problems by collecting the data in pieces

Hi,

Thanks for the answer. For the moment I'm going to follow the second point using toLocalIterator(). I changed my lines of code with this call for example: 

 

 

 

delete_data = [tuple(row) for row in records_to_delete_df.toLocalIterator()]

 

 

 

But, I was thinking, using this method spark collects the data in pieces right? So, this means that automatically doesn't collect the data at once but in pieces. If the cluster remains active, in the future the OOM could arise btw if we are using this type of call? The driver in pieces make free the memory everytime?

cgrant
Databricks Employee
Databricks Employee

I would expect both the Python process on the driver and Spark's JVM to release memory once you are done with each chunk of data. Otherwise, this sounds like a memory leak. If you suspect this is a problem in the JVM, you can look at heap dumps - they are available in the Spark UI - go to the executors tab, and you also can enable heap dumps on OOM. Usual suspects are things like HTTP sessions or some other kind of session not being released properly

Hi,

thanks for your answers, HTTP session like? Because we are using some jobs that triggers through API the job associated to the cluster activated. Can you give me an example where I need to look on Spark UI, I'm not an expert, but where I need to look? Thanks in advance!

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now