Etyr
Contributor II

Thank you for your response @stbjelcevic ,

So tried to refreh the catalog and the schema when the table was deleted in Postgre + unity catalog (the sync one) and removed the pipeline:


from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
    NewPipelineSpec,
    SyncedDatabaseTable,
    SyncedTableSchedulingPolicy,
    SyncedTableSpec,
)

from my_project.data_accessor import data_handler
from my_project.settings import settings

# Databricks workspace client (host anonymised)
w = WorkspaceClient(
    host="https://adb-XXXXXXXXXXXXXXX.azuredatabricks.net/",
    auth_type="azure-cli",
)

# Environments anonymised
env = "dev"
read_env = "prod"
settings.subscription_env = env

# Primary key definitions (table names anonymised as examples)
datas = {
    "table_asset_xref": [
        "ID",
        "CONTEXT_ID",
        "ASSET_ID",
        "ASSET_TYPE",
    ],
}

for table_name, pk_columns in datas.items():

    # Refresh foreign catalog/schema names anonymised
    data_handler.fetch_all(
        f"REFRESH FOREIGN CATALOG catalog_project_standard_{env}_region"
    )
    print(f"Refreshed foreign catalog for environment: {env}")

    data_handler.fetch_all(
        f"REFRESH FOREIGN SCHEMA catalog_project_standard_{env}_region.sync_schema"
    )
    print(f"Refreshed foreign schema for environment: {env}")

    # Create synced table
    synced_table = w.database.create_synced_database_table(
        SyncedDatabaseTable(
            # Target table in PostgreSQL (names anonymised)
            name=(
                f"catalog_project_standard_{env}_region.sync_schema.{table_name}"
            ),
            # Matches Databricks DB connection configuration (anonymised)
            database_instance_name="db_instance",
            logical_database_name=f"catalog_project_foreign_{read_env}_region",
            spec=SyncedTableSpec(
                # Source table full name (anonymised)
                source_table_full_name=(
                    f"catalog_project_foreign_{read_env}_region.db.{table_name}"
                ),
                primary_key_columns=pk_columns,
                scheduling_policy=SyncedTableSchedulingPolicy.SNAPSHOT,
                create_database_objects_if_missing=True,
                new_pipeline_spec=NewPipelineSpec(
                    storage_catalog=f"catalog_project_standard_{env}_region",
                    storage_schema="sync_schema",
                ),
            ),
        )
    )

    print(f"Created synced table: {synced_table.name}")

    # Retrieve pipeline ID and update configuration
    pipeline_id = synced_table.data_synchronization_status.pipeline_id
    w.pipelines.update(
        pipeline_id=pipeline_id,
        budget_policy_id="00000000-0000-0000-0000-000000000000",  # anonymised
        name=f"Sync to PostgreSQL {table_name}",
        catalog=f"catalog_project_standard_{env}_region",
        schema=f"db_schema_{env}",
        tags={
            "DOMAIN": "DATA",
            "PROJECT": "DATA_PLATFORM",
            "PROCESS": "SYNC_PIPELINE",
            "TOOLS": "DATABRICKS",
            "TARGET": "POSTGRESQL",
        },
    )

    print(f"Updated pipeline: {pipeline_id}")

 Here is the sample code i'm using. The `data_handler` object is connected to the SQL Warehouse of the same workspace. It's a custom package to make configurations simplier for us regarding the "env" we select. Behind, it does execute the sql command to the warehouse. I don't have errors on the SQL commands.

But sadly the issue is persisting. I also change my web browser thinking it could be a cache on the browser, but it's the same.