cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 

Generating a PostgreSQL Table Schema for ETL in Databricks

WiliamRosa
New Contributor II

In a data migration project, I needed to generate the schema of a PostgreSQL table to use in my ETL process. I’d like to share the code snippet in case someone else needs it one day:

from pyspark.sql import SparkSession
import json
import os
from typing import Dict, Any

# Mapeamento PostgreSQL → Spark SQL Types
POSTGRES_TYPE_MAPPING = {
    'int': 'IntegerType',
    'integer': 'IntegerType',
    'bigint': 'IntegerType',
    'smallint': 'IntegerType',
    'tinyint': 'IntegerType',
    'varchar': 'StringType',
    'character varying': 'StringType',
    'char': 'StringType',
    'text': 'StringType',
    'boolean': 'BooleanType',
    'double': 'DoubleType',
    'float': 'FloatType',
    'real': 'FloatType',
    'numeric': 'DecimalType',
    'decimal': 'DecimalType',
    'date': 'DateType',
    'timestamp': 'TimestampType',
    'timestamp without time zone': 'TimestampType',
    'bytea': 'BinaryType'
}

def adjust_dtype(dtype_str: str) -> str:
    """
    Maps PostgreSQL data types to Spark SQL Types.
    Falls back to StringType for unknown types.
    """
    base_type = dtype_str.lower().strip()
    mapped = POSTGRES_TYPE_MAPPING.get(base_type)
    if not mapped:
        print(f"[WARN] Unmapped type: {base_type}. Defaulting to StringType.")
        return "StringType"
    return mapped

def print_formatted_postgres_schema(schema_dict: Dict[str, Any], table_name: str):
    """
    Prints schema in a formatted dictionary style for readability.
    """
    print(f"{table_name}_schema = {{")
    for field, props in schema_dict.items():
        print(f"    '{field}': {{")
        print(f"        'size': {props['size']},")
        print(f"        'dtype': '{props['dtype']}',")
        print(f"        'unique': {props['unique']},")
        print(f"        'nullable': {props['nullable']}")
        print(f"    }},")
    print("}")

def generate_postgres_schema_dict(
    jdbc_url: str,
    schema: str,
    table: str,
    secret_scope: str,
    export_path: str = None
) -> Dict[str, Any]:
    """
    Generates schema dictionary from PostgreSQL metadata.
    Optionally exports schema as JSON file to DBFS or local path.
    """

    user = dbutils.secrets.get(scope=secret_scope, key="postgres_user")
    password = dbutils.secrets.get(scope=secret_scope, key="postgres_password")

    query = f"""
    SELECT 
        column_name,
        is_nullable = 'YES' AS nullable,
        EXISTS (
            SELECT 1
            FROM information_schema.table_constraints tc
            JOIN information_schema.constraint_column_usage ccu
              ON tc.constraint_name = ccu.constraint_name
            WHERE tc.constraint_type = 'UNIQUE'
              AND tc.table_schema = cols.table_schema
              AND tc.table_name = cols.table_name
              AND ccu.column_name = cols.column_name
        ) AS is_unique,
        data_type,
        character_maximum_length,
        numeric_precision
    FROM information_schema.columns cols
    WHERE table_schema = '{schema}' AND table_name = '{table}'
    """

    # LĆŖ metadados da tabela com JDBC usando apenas .option()
    df_metadata = (
        spark.read.format("jdbc")
        .option("url", jdbc_url)
        .option("query", query)
        .option("user", user)
        .option("password", password)
        .option("driver", "org.postgresql.Driver")
        .load()
    )

    schema_dict = {}
    for row in df_metadata.toLocalIterator():  # mais seguro que collect()
        column = row["column_name"]
        data_type = row["data_type"]
        size = row["character_maximum_length"] or row["numeric_precision"] or None

        schema_dict[column] = {
            "size": size,
            "dtype": adjust_dtype(data_type),
            "unique": row["is_unique"],
            "nullable": row["nullable"]
        }
    
    print_formatted_postgres_schema(schema_dict, table)

    # Optional JSON export
    #if export_path:
    #    with open(export_path, "w") as f:
    #        json.dump(schema_dict, f, indent=4)
    #    print(f"[INFO] Schema exported to {export_path}")

    return schema_dict

WiliamRosa_0-1755612791624.png

 



Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa
1 ACCEPTED SOLUTION

Accepted Solutions

WiliamRosa
New Contributor II

Suggestions are welcome. I hope this was helpful.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

View solution in original post

5 REPLIES 5

WiliamRosa
New Contributor II

Suggestions are welcome. I hope this was helpful.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

szymon_dybczak
Esteemed Contributor III

Thanks for sharing. One tip for you, next time if you have something you'd like to share with community we have dedicated place for that: Knowledge Sharing Hub.

Another tip: marking a post as the best answer should only apply to threads where someone defines a problem or asks a question, and your reply provides a solution. If you want to share knowledge or an interesting case from a project - use the knowledge sharing hub.

Otherwise, it will simply be considered as overuse of the system. And why? To give you example, a user could post 30 different tips in one day and mark them all as the best answer, unfairly moving up the leaderboard. 

 

WiliamRosa
New Contributor II

Thanks for the tips, bro. I’m new to the community and still learning.

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

szymon_dybczak
Esteemed Contributor III

No problem, that's why I'm sharing some tips with you, so you can avoid problem in future.

WiliamRosa
New Contributor II

tks so much

Wiliam Rosa
Data Engineer | Machine Learning Engineer
LinkedIn: linkedin.com/in/wiliamrosa

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now