Hello,
I'm trying to create a delta table sink to store delete requests coming to our system that are ingested in the bronze layer successfully with an autoloader. As I want to have a delete control table that needs to be updated after the data deletion, I wanted to create a Delta Teble Sink in my DLT pipeline.
The pipeline runs fine the first time, but after the first run it fails and I get the following error:
com.databricks.sql.managedcatalog.UnityCatalogServiceException: [RequestId=f1e909e8-5738-4fe0-b57c-aa813b82b922 ErrorClass=TABLE_ALREADY_EXISTS.RESOURCE_ALREADY_EXISTS] Table 'delete_requests' already exists
this is how I defined the sink:
import dlt
from pyspark.sql.functions import col, lit
from pyspark.sql.types import DateType
dlt.create_sink(
name = "delete_requests",
format = "delta",
options = { "tableName": "{{my_catalog}}.{{my_schema}}.delete_requests"}
)
@dlt.append_flow(name = "delta_sink_flow", target="delete_requests")
def delta_sink_flow():
return(
spark.readStream.table("{{my_catalog}}.{{my_schema}}.delete_user_service_reports")
.withColumn("request_date", col("response_timestamp").cast("date"))
.withColumn("delete_date", lit('1970-01-01').cast(DateType()))
.select("user_id", "request_date", "delete_date")
.distinct()
)
Any idea why I have this error?