cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

SQL schemas migration

maikel
Contributor II

Hello Community!

I would like to ask for your recommendation in terms of SQL schemas migration best practice. 
In our project, currently we have different SQL schemas definition and data seeding saved in SQL files. Since we are going to higher environments I would like to ask about the recommendation what is the best way to manage data migration in Databricks? I believe we need similar process to standard data base migrations like e.g. for APIs.
Shall we consider going to python and e.g. alembic+sql_alchemy for databricks?

Thanks a lot in advance for the response!

1 ACCEPTED SOLUTION

Accepted Solutions

anuj_lathi
Databricks Employee
Databricks Employee

Great question — and since you already have DABs and numbered SQL files, you're most of the way there. You do not need Alembic or SQLAlchemy. Here's a concrete implementation of the migration runner pattern that plugs directly into your existing DABs setup.

The Pattern

The idea is simple:

  1. Keep your numbered SQL migration files as-is (001, 002, etc.)
  2. Add a migration history table per environment to track what's been applied
  3. Add a single migration runner task in your DABs bundle that runs all unapplied migrations in order
  4. Each migration runs exactly once — no editing old files, new changes go in new files

Step 1: Migration History Table

This gets created automatically by the runner, but here's what it looks like:

CREATE TABLE IF NOT EXISTS ${catalog}.admin.schema_migrations (

  version STRING NOT NULL,

  file_name STRING,

  applied_at TIMESTAMP DEFAULT current_timestamp(),

  checksum STRING

);

 

Step 2: Migration Runner (Python Task)

Create a file migrations/run_migrations.py in your DABs project:

import os

import hashlib

from pyspark.sql import SparkSession

 

spark = SparkSession.builder.getOrCreate()

 

# These come from DABs variable overrides per environment

CATALOG = spark.conf.get("spark.databricks.migration.catalog")

MIGRATIONS_DIR = spark.conf.get("spark.databricks.migration.dir", "/Workspace/migrations/sql")



def get_applied_versions():

    """Read which migrations have already been applied."""

    spark.sql(f"""

        CREATE TABLE IF NOT EXISTS {CATALOG}.admin.schema_migrations (

            version STRING NOT NULL,

            file_name STRING,

            applied_at TIMESTAMP,

            checksum STRING

        )

    """)

    rows = spark.sql(

        f"SELECT version FROM {CATALOG}.admin.schema_migrations"

    ).collect()

    return {row.version for row in rows}



def get_pending_migrations(applied):

    """Find SQL files that haven't been applied yet, sorted by version prefix."""

    files = []

    for f in sorted(os.listdir(MIGRATIONS_DIR)):

        if not f.endswith(".sql"):

            continue

        version = f.split("_")[0]  # e.g. "001" from "001_create_base_schemas.sql"

        if version not in applied:

            files.append((version, f))

    return files



def run_migration(version, file_name):

    """Execute a single migration file and record it."""

    path = os.path.join(MIGRATIONS_DIR, file_name)

    with open(path, "r") as fh:

        sql_content = fh.read()

 

    checksum = hashlib.md5(sql_content.encode()).hexdigest()

 

    # Split on semicolons to handle multi-statement files

    statements = [s.strip() for s in sql_content.split(";") if s.strip()]

    for stmt in statements:

        # Replace ${catalog} placeholder with actual catalog

        resolved = stmt.replace("${catalog}", CATALOG)

        print(f"  Executing: {resolved[:80]}...")

        spark.sql(resolved)

 

    # Record successful migration

    spark.sql(f"""

        INSERT INTO {CATALOG}.admin.schema_migrations

        VALUES ('{version}', '{file_name}', current_timestamp(), '{checksum}')

    """)

    print(f"  Recorded migration {version}: {file_name}")



def main():

    applied = get_applied_versions()

    print(f"Already applied: {sorted(applied)}")

 

    pending = get_pending_migrations(applied)

    if not pending:

        print("No new migrations to apply.")

        return

 

    print(f"Applying {len(pending)} migration(s)...")

    for version, file_name in pending:

        print(f"\n--- Migration {version}: {file_name} ---")

        run_migration(version, file_name)

 

    print("\nAll migrations applied successfully.")



main()

 

Step 3: DABs Bundle Configuration

In your databricks.yml, add the migration runner as a job with environment-specific catalog overrides:

variables:

  catalog:

    default: dev_catalog

 

resources:

  jobs:

    schema_migrations:

      name: "schema-migrations-${bundle.environment}"

      tasks:

        - task_key: run_migrations

          existing_cluster_id: ${var.cluster_id}

          spark_python_task:

            python_file: ./migrations/run_migrations.py

            parameters: []

          spark_conf:

            spark.databricks.migration.catalog: ${var.catalog}

            spark.databricks.migration.dir: /Workspace/${workspace.root_path}/migrations/sql

 

environments:

  dev:

    variables:

      catalog: dev_catalog

  test:

    variables:

      catalog: test_catalog

  prod:

    variables:

      catalog: prod_catalog

 

Step 4: Your SQL Migration Files

Keep them exactly as you have them — numbered, one per change, never edited after creation:

migrations/sql/

  001_create_base_schemas.sql

  002_create_orders_table.sql

  003_seed_reference_data.sql

  004_add_status_column.sql      ← new changes = new file

 

Example migration file (001_create_base_schemas.sql😞

CREATE SCHEMA IF NOT EXISTS ${catalog}.analytics;

CREATE SCHEMA IF NOT EXISTS ${catalog}.admin;

 

Example seed file (003_seed_reference_data.sql) — use MERGE for idempotency:

MERGE INTO ${catalog}.analytics.order_status AS target

USING (

  SELECT * FROM VALUES

    ('NEW', 'New Order'),

    ('SHIPPED', 'Order Shipped'),

    ('DELIVERED', 'Order Delivered')

  AS source(code, description)

) AS source

ON target.code = source.code

WHEN NOT MATCHED THEN INSERT *

 

How It Works in Practice

Adding a new migration:

  1. Create 005_add_customer_email.sql in migrations/sql/
  2. Commit and push
  3. databricks bundle deploy -e test → runs the migration job → runner sees 005 is not in history → applies it
  4. databricks bundle deploy -e prod → same thing for prod

The runner is safe to re-run — it always checks the history table first. Already-applied migrations are skipped.

To Answer Your Specific Questions

  • Version = the prefix (001, 002, etc.). Exactly right — never edit old migrations, always add new files.
  • One migration job, not one task per file — the single Python runner task handles all files. No need to edit YAML when adding migrations.
  • Version check is in the runner, not in each SQL file — the runner reads the history table once, then only executes files whose version prefix isn't recorded yet.
  • No need for Alembic — this pattern gives you the same ordered, idempotent, environment-aware migrations without adding Python ORM complexity. Your migrations stay as plain SQL, which is easier for the whole team to work with.
Anuj Lathi
Solutions Engineer @ Databricks

View solution in original post

5 REPLIES 5

pradeep_singh
Contributor III

My two cents . Looking for better perspective from others . 

I have seen organization used Flyway or Liquibase use for schema management . 

If you are looking for databricks native feature you can use DABS to deploy your schema and a python job that runs seed scripts at bundle deployment .You might want to control what sql scripts get run on subsequent runs of the seed job to avoid reloading of the same data . 

Thank You
Pradeep Singh - https://www.linkedin.com/in/dbxdev

Louis_Frolio
Databricks Employee
Databricks Employee

 

Hi @maikel ,

Good question, and a pretty common pain point once you start promoting to higher environments.

Databricks doesn't ship a dedicated schema migration framework, so the standard approach is what you'd expect: keep your DDL and seed SQL in version control, automate execution in order per environment, and write scripts to be idempotent so they're safe to re-run. You don't need to move to Python just to manage this well.

For environment layout, separate catalogs or schemas per environment in Unity Catalog with consistent naming works well — e.g. dev.analytics.orders, test.analytics.orders, prod.analytics.orders.

Store migrations as ordered files:

  • 001_create_base_schemas.sql
  • 002_create_orders_tables.sql
  • 003_seed_reference_data.sql

Write them idempotent — CREATE TABLE IF NOT EXISTS, MERGE for seed data instead of plain INSERT, ALTER TABLE for structural changes where you can.

Track what's run with a simple migration history table in each environment:

CREATE TABLE IF NOT EXISTS admin.schema_migrations (
  version STRING,
  applied_at TIMESTAMP
);

Your deployment job reads which versions are already applied, runs only the new files, and logs a row after each successful migration. Nothing fancy, but it works.

For the dev → test → prod promotion side, Databricks Asset Bundles (DABs) is worth a look. It's not a schema migration tool on its own, but it handles promoting jobs and pipelines across environments with variable overrides per environment — pairs naturally with the versioned SQL pattern above.

On Alembic: it works, and you can keep migrations as mostly raw SQL so you retain full control over Delta DDL. Makes sense if your team is already standardized on it elsewhere. If not, the SQL-only approach is simpler and a lot more accessible to folks who aren't deep in Python.

Bottom line — you don't need Alembic to do this well. Version-controlled SQL + a migration history table + automated execution per environment is a solid, widely-used pattern. Happy to dig into any piece of this further.

Cheers, Lou

maikel
Contributor II

Hello @Louis_Frolio !

this sounds very nice! I have already DAB implemented! A few questions to the above, but I think I more or less understand the rest. What is your suggestion in terms of keeping the version of migration or what do you mean by this? Is it about the prefix 001, 002 etc? Because I think we do not want to edit existing migrations so there will be always one version. In any change is required it should be done in a separate file:

001_create_base_schemas.sql
002_create_orders_tables.sql
003_seed_reference_data.sql
004_base_schemas_update.sql

Also currently we have yml files (001_create_base_schemas.yml, 002_create_orders_tables.yml etc.) separated per different tables creation and seed (exactly as you mentioned).

Shall we merge it to the one migration job and each of the migrations should be a separate task? 
Additionally every time when new migration file will be added shall we add new task to the migration yml? And in terms of "Your deployment job reads which versions are already applied" shall we check it in every file (at the top) whether it is already in the admin.schema_migrations? Could it be possible for you to give me a small draft how do you imagine this?

Thank you a milion in advance!


anuj_lathi
Databricks Employee
Databricks Employee

Great question — and since you already have DABs and numbered SQL files, you're most of the way there. You do not need Alembic or SQLAlchemy. Here's a concrete implementation of the migration runner pattern that plugs directly into your existing DABs setup.

The Pattern

The idea is simple:

  1. Keep your numbered SQL migration files as-is (001, 002, etc.)
  2. Add a migration history table per environment to track what's been applied
  3. Add a single migration runner task in your DABs bundle that runs all unapplied migrations in order
  4. Each migration runs exactly once — no editing old files, new changes go in new files

Step 1: Migration History Table

This gets created automatically by the runner, but here's what it looks like:

CREATE TABLE IF NOT EXISTS ${catalog}.admin.schema_migrations (

  version STRING NOT NULL,

  file_name STRING,

  applied_at TIMESTAMP DEFAULT current_timestamp(),

  checksum STRING

);

 

Step 2: Migration Runner (Python Task)

Create a file migrations/run_migrations.py in your DABs project:

import os

import hashlib

from pyspark.sql import SparkSession

 

spark = SparkSession.builder.getOrCreate()

 

# These come from DABs variable overrides per environment

CATALOG = spark.conf.get("spark.databricks.migration.catalog")

MIGRATIONS_DIR = spark.conf.get("spark.databricks.migration.dir", "/Workspace/migrations/sql")



def get_applied_versions():

    """Read which migrations have already been applied."""

    spark.sql(f"""

        CREATE TABLE IF NOT EXISTS {CATALOG}.admin.schema_migrations (

            version STRING NOT NULL,

            file_name STRING,

            applied_at TIMESTAMP,

            checksum STRING

        )

    """)

    rows = spark.sql(

        f"SELECT version FROM {CATALOG}.admin.schema_migrations"

    ).collect()

    return {row.version for row in rows}



def get_pending_migrations(applied):

    """Find SQL files that haven't been applied yet, sorted by version prefix."""

    files = []

    for f in sorted(os.listdir(MIGRATIONS_DIR)):

        if not f.endswith(".sql"):

            continue

        version = f.split("_")[0]  # e.g. "001" from "001_create_base_schemas.sql"

        if version not in applied:

            files.append((version, f))

    return files



def run_migration(version, file_name):

    """Execute a single migration file and record it."""

    path = os.path.join(MIGRATIONS_DIR, file_name)

    with open(path, "r") as fh:

        sql_content = fh.read()

 

    checksum = hashlib.md5(sql_content.encode()).hexdigest()

 

    # Split on semicolons to handle multi-statement files

    statements = [s.strip() for s in sql_content.split(";") if s.strip()]

    for stmt in statements:

        # Replace ${catalog} placeholder with actual catalog

        resolved = stmt.replace("${catalog}", CATALOG)

        print(f"  Executing: {resolved[:80]}...")

        spark.sql(resolved)

 

    # Record successful migration

    spark.sql(f"""

        INSERT INTO {CATALOG}.admin.schema_migrations

        VALUES ('{version}', '{file_name}', current_timestamp(), '{checksum}')

    """)

    print(f"  Recorded migration {version}: {file_name}")



def main():

    applied = get_applied_versions()

    print(f"Already applied: {sorted(applied)}")

 

    pending = get_pending_migrations(applied)

    if not pending:

        print("No new migrations to apply.")

        return

 

    print(f"Applying {len(pending)} migration(s)...")

    for version, file_name in pending:

        print(f"\n--- Migration {version}: {file_name} ---")

        run_migration(version, file_name)

 

    print("\nAll migrations applied successfully.")



main()

 

Step 3: DABs Bundle Configuration

In your databricks.yml, add the migration runner as a job with environment-specific catalog overrides:

variables:

  catalog:

    default: dev_catalog

 

resources:

  jobs:

    schema_migrations:

      name: "schema-migrations-${bundle.environment}"

      tasks:

        - task_key: run_migrations

          existing_cluster_id: ${var.cluster_id}

          spark_python_task:

            python_file: ./migrations/run_migrations.py

            parameters: []

          spark_conf:

            spark.databricks.migration.catalog: ${var.catalog}

            spark.databricks.migration.dir: /Workspace/${workspace.root_path}/migrations/sql

 

environments:

  dev:

    variables:

      catalog: dev_catalog

  test:

    variables:

      catalog: test_catalog

  prod:

    variables:

      catalog: prod_catalog

 

Step 4: Your SQL Migration Files

Keep them exactly as you have them — numbered, one per change, never edited after creation:

migrations/sql/

  001_create_base_schemas.sql

  002_create_orders_table.sql

  003_seed_reference_data.sql

  004_add_status_column.sql      ← new changes = new file

 

Example migration file (001_create_base_schemas.sql😞

CREATE SCHEMA IF NOT EXISTS ${catalog}.analytics;

CREATE SCHEMA IF NOT EXISTS ${catalog}.admin;

 

Example seed file (003_seed_reference_data.sql) — use MERGE for idempotency:

MERGE INTO ${catalog}.analytics.order_status AS target

USING (

  SELECT * FROM VALUES

    ('NEW', 'New Order'),

    ('SHIPPED', 'Order Shipped'),

    ('DELIVERED', 'Order Delivered')

  AS source(code, description)

) AS source

ON target.code = source.code

WHEN NOT MATCHED THEN INSERT *

 

How It Works in Practice

Adding a new migration:

  1. Create 005_add_customer_email.sql in migrations/sql/
  2. Commit and push
  3. databricks bundle deploy -e test → runs the migration job → runner sees 005 is not in history → applies it
  4. databricks bundle deploy -e prod → same thing for prod

The runner is safe to re-run — it always checks the history table first. Already-applied migrations are skipped.

To Answer Your Specific Questions

  • Version = the prefix (001, 002, etc.). Exactly right — never edit old migrations, always add new files.
  • One migration job, not one task per file — the single Python runner task handles all files. No need to edit YAML when adding migrations.
  • Version check is in the runner, not in each SQL file — the runner reads the history table once, then only executes files whose version prefix isn't recorded yet.
  • No need for Alembic — this pattern gives you the same ordered, idempotent, environment-aware migrations without adding Python ORM complexity. Your migrations stay as plain SQL, which is easier for the whole team to work with.
Anuj Lathi
Solutions Engineer @ Databricks

maikel
Contributor II

@anuj_lathi and @Louis_Frolio 

thank you very much! This is really great approach and example!