Error while reading steaming source from DLT Pipeline
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-20-2024 03:01 AM
Hi All,
I am getting this error, when I am reading the streaming source
Full load - It loads
Incremental load - am facing this error
This is the piece of code that am using
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-20-2024 11:31 PM
@Bhaskar29
It looks like the source table got vacuumed, that's why streaming is unable to read the CDF for the given timestamp.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-21-2024 09:59 PM
How do we handle this scenario ? Incremental loads are failing when we pass the starting_timestamp and ending_timestamp which we are deriving from the event log
# Path to the Delta table
event_log_path = "/pipelines/{pipeline_id}/system/events"
# Read Delta table into DataFrame
sql_query = f"""
SELECT id
,MAX(timestamp) AS timestamp
FROM delta.`{event_log_path}`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY id
LIMIT 1
"""
df_filtered = spark.sql(sql_query)
# Extract the timestamp from the result
top_record = df_filtered.first()
starting_timestamp = top_record['timestamp']
ending_timestamp = datetime.now()
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
06-21-2024 12:47 AM
thanks for the info

