Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-21-2024 09:59 PM
How do we handle this scenario ? Incremental loads are failing when we pass the starting_timestamp and ending_timestamp which we are deriving from the event log
# Path to the Delta table
event_log_path = "/pipelines/{pipeline_id}/system/events"
# Read Delta table into DataFrame
sql_query = f"""
SELECT id
,MAX(timestamp) AS timestamp
FROM delta.`{event_log_path}`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY id
LIMIT 1
"""
df_filtered = spark.sql(sql_query)
# Extract the timestamp from the result
top_record = df_filtered.first()
starting_timestamp = top_record['timestamp']
ending_timestamp = datetime.now()