Thank you for your response @stbjelcevic ,
So tried to refreh the catalog and the schema when the table was deleted in Postgre + unity catalog (the sync one) and removed the pipeline:
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.database import (
NewPipelineSpec,
SyncedDatabaseTable,
SyncedTableSchedulingPolicy,
SyncedTableSpec,
)
from my_project.data_accessor import data_handler
from my_project.settings import settings
# Databricks workspace client (host anonymised)
w = WorkspaceClient(
host="https://adb-XXXXXXXXXXXXXXX.azuredatabricks.net/",
auth_type="azure-cli",
)
# Environments anonymised
env = "dev"
read_env = "prod"
settings.subscription_env = env
# Primary key definitions (table names anonymised as examples)
datas = {
"table_asset_xref": [
"ID",
"CONTEXT_ID",
"ASSET_ID",
"ASSET_TYPE",
],
}
for table_name, pk_columns in datas.items():
# Refresh foreign catalog/schema names anonymised
data_handler.fetch_all(
f"REFRESH FOREIGN CATALOG catalog_project_standard_{env}_region"
)
print(f"Refreshed foreign catalog for environment: {env}")
data_handler.fetch_all(
f"REFRESH FOREIGN SCHEMA catalog_project_standard_{env}_region.sync_schema"
)
print(f"Refreshed foreign schema for environment: {env}")
# Create synced table
synced_table = w.database.create_synced_database_table(
SyncedDatabaseTable(
# Target table in PostgreSQL (names anonymised)
name=(
f"catalog_project_standard_{env}_region.sync_schema.{table_name}"
),
# Matches Databricks DB connection configuration (anonymised)
database_instance_name="db_instance",
logical_database_name=f"catalog_project_foreign_{read_env}_region",
spec=SyncedTableSpec(
# Source table full name (anonymised)
source_table_full_name=(
f"catalog_project_foreign_{read_env}_region.db.{table_name}"
),
primary_key_columns=pk_columns,
scheduling_policy=SyncedTableSchedulingPolicy.SNAPSHOT,
create_database_objects_if_missing=True,
new_pipeline_spec=NewPipelineSpec(
storage_catalog=f"catalog_project_standard_{env}_region",
storage_schema="sync_schema",
),
),
)
)
print(f"Created synced table: {synced_table.name}")
# Retrieve pipeline ID and update configuration
pipeline_id = synced_table.data_synchronization_status.pipeline_id
w.pipelines.update(
pipeline_id=pipeline_id,
budget_policy_id="00000000-0000-0000-0000-000000000000", # anonymised
name=f"Sync to PostgreSQL {table_name}",
catalog=f"catalog_project_standard_{env}_region",
schema=f"db_schema_{env}",
tags={
"DOMAIN": "DATA",
"PROJECT": "DATA_PLATFORM",
"PROCESS": "SYNC_PIPELINE",
"TOOLS": "DATABRICKS",
"TARGET": "POSTGRESQL",
},
)
print(f"Updated pipeline: {pipeline_id}")
Here is the sample code i'm using. The `data_handler` object is connected to the SQL Warehouse of the same workspace. It's a custom package to make configurations simplier for us regarding the "env" we select. Behind, it does execute the sql command to the warehouse. I don't have errors on the SQL commands.
But sadly the issue is persisting. I also change my web browser thinking it could be a cache on the browser, but it's the same.