jeremy98
Honored Contributor

 

Hello,

Thanks for your response. Yes, I need to sync Databricks with PostgreSQL. I was using psycopg2 for this purpose:

 

 

   def _execute_dml(self, query: str, data: List[tuple], connection_properties: Dict, operation: str, batch_size: int = 1000) -> None:
        """
        Execute DML operations using psycopg2.extras.execute_batch for better performance.
        
        Args:
            query: SQL query to execute
            data: List of parameter tuples
            connection_properties: Database connection details
            operation: Type of operation (INSERT, UPDATE, DELETE)
            batch_size: Number of rows per batch (default: 1000)
        """

        conn = psycopg2.connect(
            dbname=connection_properties['db_name'],
            user=connection_properties['db_username'],
            password=connection_properties['db_password'],
            host=connection_properties['host'],
            port=connection_properties['port']
        )
        
        with conn:
            with conn.cursor() as cursor:
                print(f"Executing {operation} operation in batches of {batch_size}")
                extras.execute_batch(cursor, query, data, page_size=batch_size)
                affected_rows = cursor.rowcount
            print(f"Successfully executed {operation} operation. Affected rows: {affected_rows}")

        conn.close()

 

 

However, the data needs to be passed as a list of tuples. Since I was using a Spark DataFrame, I converted it using .collect(), but this can cause an out-of-memory (OOM) issue after about 10 days of operation.

Since we are using a cluster that stays up continuously, is there any way to clean up the driver memory after every finished job that uses this cluster?

Thanks in advance for your help!