How do we handle this scenario ? Incremental loads are failing when we pass the starting_timestamp and ending_timestamp which we are deriving from the event log

 

# Path to the Delta table
event_log_path = "/pipelines/{pipeline_id}/system/events"
# Read Delta table into DataFrame

sql_query = f"""
SELECT id
,MAX(timestamp) AS timestamp
FROM delta.`{event_log_path}`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY id
LIMIT 1
"""
df_filtered = spark.sql(sql_query)

# Extract the timestamp from the result
top_record = df_filtered.first()
starting_timestamp = top_record['timestamp']
ending_timestamp = datetime.now()