2 weeks ago
In a data migration project, I needed to generate the schema of a PostgreSQL table to use in my ETL process. Iād like to share the code snippet in case someone else needs it one day:
from pyspark.sql import SparkSession
import json
import os
from typing import Dict, Any
# Mapeamento PostgreSQL ā Spark SQL Types
POSTGRES_TYPE_MAPPING = {
'int': 'IntegerType',
'integer': 'IntegerType',
'bigint': 'IntegerType',
'smallint': 'IntegerType',
'tinyint': 'IntegerType',
'varchar': 'StringType',
'character varying': 'StringType',
'char': 'StringType',
'text': 'StringType',
'boolean': 'BooleanType',
'double': 'DoubleType',
'float': 'FloatType',
'real': 'FloatType',
'numeric': 'DecimalType',
'decimal': 'DecimalType',
'date': 'DateType',
'timestamp': 'TimestampType',
'timestamp without time zone': 'TimestampType',
'bytea': 'BinaryType'
}
def adjust_dtype(dtype_str: str) -> str:
"""
Maps PostgreSQL data types to Spark SQL Types.
Falls back to StringType for unknown types.
"""
base_type = dtype_str.lower().strip()
mapped = POSTGRES_TYPE_MAPPING.get(base_type)
if not mapped:
print(f"[WARN] Unmapped type: {base_type}. Defaulting to StringType.")
return "StringType"
return mapped
def print_formatted_postgres_schema(schema_dict: Dict[str, Any], table_name: str):
"""
Prints schema in a formatted dictionary style for readability.
"""
print(f"{table_name}_schema = {{")
for field, props in schema_dict.items():
print(f" '{field}': {{")
print(f" 'size': {props['size']},")
print(f" 'dtype': '{props['dtype']}',")
print(f" 'unique': {props['unique']},")
print(f" 'nullable': {props['nullable']}")
print(f" }},")
print("}")
def generate_postgres_schema_dict(
jdbc_url: str,
schema: str,
table: str,
secret_scope: str,
export_path: str = None
) -> Dict[str, Any]:
"""
Generates schema dictionary from PostgreSQL metadata.
Optionally exports schema as JSON file to DBFS or local path.
"""
user = dbutils.secrets.get(scope=secret_scope, key="postgres_user")
password = dbutils.secrets.get(scope=secret_scope, key="postgres_password")
query = f"""
SELECT
column_name,
is_nullable = 'YES' AS nullable,
EXISTS (
SELECT 1
FROM information_schema.table_constraints tc
JOIN information_schema.constraint_column_usage ccu
ON tc.constraint_name = ccu.constraint_name
WHERE tc.constraint_type = 'UNIQUE'
AND tc.table_schema = cols.table_schema
AND tc.table_name = cols.table_name
AND ccu.column_name = cols.column_name
) AS is_unique,
data_type,
character_maximum_length,
numeric_precision
FROM information_schema.columns cols
WHERE table_schema = '{schema}' AND table_name = '{table}'
"""
# LĆŖ metadados da tabela com JDBC usando apenas .option()
df_metadata = (
spark.read.format("jdbc")
.option("url", jdbc_url)
.option("query", query)
.option("user", user)
.option("password", password)
.option("driver", "org.postgresql.Driver")
.load()
)
schema_dict = {}
for row in df_metadata.toLocalIterator(): # mais seguro que collect()
column = row["column_name"]
data_type = row["data_type"]
size = row["character_maximum_length"] or row["numeric_precision"] or None
schema_dict[column] = {
"size": size,
"dtype": adjust_dtype(data_type),
"unique": row["is_unique"],
"nullable": row["nullable"]
}
print_formatted_postgres_schema(schema_dict, table)
# Optional JSON export
#if export_path:
# with open(export_path, "w") as f:
# json.dump(schema_dict, f, indent=4)
# print(f"[INFO] Schema exported to {export_path}")
return schema_dict
2 weeks ago
Suggestions are welcome. I hope this was helpful.
2 weeks ago
Suggestions are welcome. I hope this was helpful.
2 weeks ago
Thanks for sharing. One tip for you, next time if you have something you'd like to share with community we have dedicated place for that: Knowledge Sharing Hub.
Another tip: marking a post as the best answer should only apply to threads where someone defines a problem or asks a question, and your reply provides a solution. If you want to share knowledge or an interesting case from a project - use the knowledge sharing hub.
Otherwise, it will simply be considered as overuse of the system. And why? To give you example, a user could post 30 different tips in one day and mark them all as the best answer, unfairly moving up the leaderboard.
2 weeks ago
Thanks for the tips, bro. Iām new to the community and still learning.
2 weeks ago
No problem, that's why I'm sharing some tips with you, so you can avoid problem in future.
2 weeks ago
tks so much
Passionate about hosting events and connecting people? Help us grow a vibrant local communityāsign up today to get started!
Sign Up Now