I am using Delta Live Tables and have my pipeline defined using the code below. My understanding is that a checkpoint is automatically set when using Delta Live Tables. I am using the Unity Catalog and Schema settings in the pipeline as the storage destination.
Since I am reading JSON messages and many files are being created, I want to eventually run a cleanup process to delete the old files that have already been written to the streaming table. I thought I could do this by looking at the checkpoint file. But I am unable to find where the checkpoints are being written or how i can access them. When i try to manually set a checkpoint directory, nothing gets created when the pipeline runs.
@Dlt.table(
name="newdata_raw",
table_properties={"quality": "bronze"},
temporary=False,
)
def create_table():
query = (
spark.readStream.format("cloudFiles")
.schema(schema)
.option("cloudFiles.format", "json")
.load(sink_dir + "partition=*/")
.selectExpr("newRecord.*")
.withColumn("LOAD_DT", to_timestamp(current_timestamp()))
)
return query