โ03-10-2026 01:06 AM
Hello Community!
I would like to ask for your recommendation in terms of SQL schemas migration best practice.
In our project, currently we have different SQL schemas definition and data seeding saved in SQL files. Since we are going to higher environments I would like to ask about the recommendation what is the best way to manage data migration in Databricks? I believe we need similar process to standard data base migrations like e.g. for APIs.
Shall we consider going to python and e.g. alembic+sql_alchemy for databricks?
Thanks a lot in advance for the response!
3 weeks ago
Great question โ and since you already have DABs and numbered SQL files, you're most of the way there. You do not need Alembic or SQLAlchemy. Here's a concrete implementation of the migration runner pattern that plugs directly into your existing DABs setup.
The idea is simple:
This gets created automatically by the runner, but here's what it looks like:
CREATE TABLE IF NOT EXISTS ${catalog}.admin.schema_migrations (
version STRING NOT NULL,
file_name STRING,
applied_at TIMESTAMP DEFAULT current_timestamp(),
checksum STRING
);
Create a file migrations/run_migrations.py in your DABs project:
import os
import hashlib
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# These come from DABs variable overrides per environment
CATALOG = spark.conf.get("spark.databricks.migration.catalog")
MIGRATIONS_DIR = spark.conf.get("spark.databricks.migration.dir", "/Workspace/migrations/sql")
def get_applied_versions():
"""Read which migrations have already been applied."""
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG}.admin.schema_migrations (
version STRING NOT NULL,
file_name STRING,
applied_at TIMESTAMP,
checksum STRING
)
""")
rows = spark.sql(
f"SELECT version FROM {CATALOG}.admin.schema_migrations"
).collect()
return {row.version for row in rows}
def get_pending_migrations(applied):
"""Find SQL files that haven't been applied yet, sorted by version prefix."""
files = []
for f in sorted(os.listdir(MIGRATIONS_DIR)):
if not f.endswith(".sql"):
continue
version = f.split("_")[0] # e.g. "001" from "001_create_base_schemas.sql"
if version not in applied:
files.append((version, f))
return files
def run_migration(version, file_name):
"""Execute a single migration file and record it."""
path = os.path.join(MIGRATIONS_DIR, file_name)
with open(path, "r") as fh:
sql_content = fh.read()
checksum = hashlib.md5(sql_content.encode()).hexdigest()
# Split on semicolons to handle multi-statement files
statements = [s.strip() for s in sql_content.split(";") if s.strip()]
for stmt in statements:
# Replace ${catalog} placeholder with actual catalog
resolved = stmt.replace("${catalog}", CATALOG)
print(f" Executing: {resolved[:80]}...")
spark.sql(resolved)
# Record successful migration
spark.sql(f"""
INSERT INTO {CATALOG}.admin.schema_migrations
VALUES ('{version}', '{file_name}', current_timestamp(), '{checksum}')
""")
print(f" Recorded migration {version}: {file_name}")
def main():
applied = get_applied_versions()
print(f"Already applied: {sorted(applied)}")
pending = get_pending_migrations(applied)
if not pending:
print("No new migrations to apply.")
return
print(f"Applying {len(pending)} migration(s)...")
for version, file_name in pending:
print(f"\n--- Migration {version}: {file_name} ---")
run_migration(version, file_name)
print("\nAll migrations applied successfully.")
main()
In your databricks.yml, add the migration runner as a job with environment-specific catalog overrides:
variables:
catalog:
default: dev_catalog
resources:
jobs:
schema_migrations:
name: "schema-migrations-${bundle.environment}"
tasks:
- task_key: run_migrations
existing_cluster_id: ${var.cluster_id}
spark_python_task:
python_file: ./migrations/run_migrations.py
parameters: []
spark_conf:
spark.databricks.migration.catalog: ${var.catalog}
spark.databricks.migration.dir: /Workspace/${workspace.root_path}/migrations/sql
environments:
dev:
variables:
catalog: dev_catalog
test:
variables:
catalog: test_catalog
prod:
variables:
catalog: prod_catalog
Keep them exactly as you have them โ numbered, one per change, never edited after creation:
migrations/sql/
001_create_base_schemas.sql
002_create_orders_table.sql
003_seed_reference_data.sql
004_add_status_column.sql โ new changes = new file
Example migration file (001_create_base_schemas.sql๐
CREATE SCHEMA IF NOT EXISTS ${catalog}.analytics;
CREATE SCHEMA IF NOT EXISTS ${catalog}.admin;
Example seed file (003_seed_reference_data.sql) โ use MERGE for idempotency:
MERGE INTO ${catalog}.analytics.order_status AS target
USING (
SELECT * FROM VALUES
('NEW', 'New Order'),
('SHIPPED', 'Order Shipped'),
('DELIVERED', 'Order Delivered')
AS source(code, description)
) AS source
ON target.code = source.code
WHEN NOT MATCHED THEN INSERT *
Adding a new migration:
The runner is safe to re-run โ it always checks the history table first. Already-applied migrations are skipped.
โ03-10-2026 07:59 PM
My two cents . Looking for better perspective from others .
I have seen organization used Flyway or Liquibase use for schema management .
If you are looking for databricks native feature you can use DABS to deploy your schema and a python job that runs seed scripts at bundle deployment .You might want to control what sql scripts get run on subsequent runs of the seed job to avoid reloading of the same data .
4 weeks ago
Hi @maikel ,
Good question, and a pretty common pain point once you start promoting to higher environments.
Databricks doesn't ship a dedicated schema migration framework, so the standard approach is what you'd expect: keep your DDL and seed SQL in version control, automate execution in order per environment, and write scripts to be idempotent so they're safe to re-run. You don't need to move to Python just to manage this well.
For environment layout, separate catalogs or schemas per environment in Unity Catalog with consistent naming works well โ e.g. dev.analytics.orders, test.analytics.orders, prod.analytics.orders.
Store migrations as ordered files:
001_create_base_schemas.sql002_create_orders_tables.sql003_seed_reference_data.sqlWrite them idempotent โ CREATE TABLE IF NOT EXISTS, MERGE for seed data instead of plain INSERT, ALTER TABLE for structural changes where you can.
Track what's run with a simple migration history table in each environment:
CREATE TABLE IF NOT EXISTS admin.schema_migrations (
version STRING,
applied_at TIMESTAMP
);
Your deployment job reads which versions are already applied, runs only the new files, and logs a row after each successful migration. Nothing fancy, but it works.
For the dev โ test โ prod promotion side, Databricks Asset Bundles (DABs) is worth a look. It's not a schema migration tool on its own, but it handles promoting jobs and pipelines across environments with variable overrides per environment โ pairs naturally with the versioned SQL pattern above.
On Alembic: it works, and you can keep migrations as mostly raw SQL so you retain full control over Delta DDL. Makes sense if your team is already standardized on it elsewhere. If not, the SQL-only approach is simpler and a lot more accessible to folks who aren't deep in Python.
Bottom line โ you don't need Alembic to do this well. Version-controlled SQL + a migration history table + automated execution per environment is a solid, widely-used pattern. Happy to dig into any piece of this further.
Cheers, Lou
3 weeks ago
Hello @Louis_Frolio !
this sounds very nice! I have already DAB implemented! A few questions to the above, but I think I more or less understand the rest. What is your suggestion in terms of keeping the version of migration or what do you mean by this? Is it about the prefix 001, 002 etc? Because I think we do not want to edit existing migrations so there will be always one version. In any change is required it should be done in a separate file:
001_create_base_schemas.sql
002_create_orders_tables.sql
003_seed_reference_data.sql
004_base_schemas_update.sqlAlso currently we have yml files (001_create_base_schemas.yml, 002_create_orders_tables.yml etc.) separated per different tables creation and seed (exactly as you mentioned).
Shall we merge it to the one migration job and each of the migrations should be a separate task?
Additionally every time when new migration file will be added shall we add new task to the migration yml? And in terms of "Your deployment job reads which versions are already applied" shall we check it in every file (at the top) whether it is already in the admin.schema_migrations? Could it be possible for you to give me a small draft how do you imagine this?
Thank you a milion in advance!
3 weeks ago
Great question โ and since you already have DABs and numbered SQL files, you're most of the way there. You do not need Alembic or SQLAlchemy. Here's a concrete implementation of the migration runner pattern that plugs directly into your existing DABs setup.
The idea is simple:
This gets created automatically by the runner, but here's what it looks like:
CREATE TABLE IF NOT EXISTS ${catalog}.admin.schema_migrations (
version STRING NOT NULL,
file_name STRING,
applied_at TIMESTAMP DEFAULT current_timestamp(),
checksum STRING
);
Create a file migrations/run_migrations.py in your DABs project:
import os
import hashlib
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# These come from DABs variable overrides per environment
CATALOG = spark.conf.get("spark.databricks.migration.catalog")
MIGRATIONS_DIR = spark.conf.get("spark.databricks.migration.dir", "/Workspace/migrations/sql")
def get_applied_versions():
"""Read which migrations have already been applied."""
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {CATALOG}.admin.schema_migrations (
version STRING NOT NULL,
file_name STRING,
applied_at TIMESTAMP,
checksum STRING
)
""")
rows = spark.sql(
f"SELECT version FROM {CATALOG}.admin.schema_migrations"
).collect()
return {row.version for row in rows}
def get_pending_migrations(applied):
"""Find SQL files that haven't been applied yet, sorted by version prefix."""
files = []
for f in sorted(os.listdir(MIGRATIONS_DIR)):
if not f.endswith(".sql"):
continue
version = f.split("_")[0] # e.g. "001" from "001_create_base_schemas.sql"
if version not in applied:
files.append((version, f))
return files
def run_migration(version, file_name):
"""Execute a single migration file and record it."""
path = os.path.join(MIGRATIONS_DIR, file_name)
with open(path, "r") as fh:
sql_content = fh.read()
checksum = hashlib.md5(sql_content.encode()).hexdigest()
# Split on semicolons to handle multi-statement files
statements = [s.strip() for s in sql_content.split(";") if s.strip()]
for stmt in statements:
# Replace ${catalog} placeholder with actual catalog
resolved = stmt.replace("${catalog}", CATALOG)
print(f" Executing: {resolved[:80]}...")
spark.sql(resolved)
# Record successful migration
spark.sql(f"""
INSERT INTO {CATALOG}.admin.schema_migrations
VALUES ('{version}', '{file_name}', current_timestamp(), '{checksum}')
""")
print(f" Recorded migration {version}: {file_name}")
def main():
applied = get_applied_versions()
print(f"Already applied: {sorted(applied)}")
pending = get_pending_migrations(applied)
if not pending:
print("No new migrations to apply.")
return
print(f"Applying {len(pending)} migration(s)...")
for version, file_name in pending:
print(f"\n--- Migration {version}: {file_name} ---")
run_migration(version, file_name)
print("\nAll migrations applied successfully.")
main()
In your databricks.yml, add the migration runner as a job with environment-specific catalog overrides:
variables:
catalog:
default: dev_catalog
resources:
jobs:
schema_migrations:
name: "schema-migrations-${bundle.environment}"
tasks:
- task_key: run_migrations
existing_cluster_id: ${var.cluster_id}
spark_python_task:
python_file: ./migrations/run_migrations.py
parameters: []
spark_conf:
spark.databricks.migration.catalog: ${var.catalog}
spark.databricks.migration.dir: /Workspace/${workspace.root_path}/migrations/sql
environments:
dev:
variables:
catalog: dev_catalog
test:
variables:
catalog: test_catalog
prod:
variables:
catalog: prod_catalog
Keep them exactly as you have them โ numbered, one per change, never edited after creation:
migrations/sql/
001_create_base_schemas.sql
002_create_orders_table.sql
003_seed_reference_data.sql
004_add_status_column.sql โ new changes = new file
Example migration file (001_create_base_schemas.sql๐
CREATE SCHEMA IF NOT EXISTS ${catalog}.analytics;
CREATE SCHEMA IF NOT EXISTS ${catalog}.admin;
Example seed file (003_seed_reference_data.sql) โ use MERGE for idempotency:
MERGE INTO ${catalog}.analytics.order_status AS target
USING (
SELECT * FROM VALUES
('NEW', 'New Order'),
('SHIPPED', 'Order Shipped'),
('DELIVERED', 'Order Delivered')
AS source(code, description)
) AS source
ON target.code = source.code
WHEN NOT MATCHED THEN INSERT *
Adding a new migration:
The runner is safe to re-run โ it always checks the history table first. Already-applied migrations are skipped.
3 weeks ago