Error while reading steaming source from DLT Pipeline

Bhaskar29
Databricks Partner

Hi All, 

I am getting this error, when I am reading the streaming source

Full load - It loads

Incremental load - am facing this error

This is the piece of code that am using 

def gim_suppliers_ln():
    logger.info("Starting __cn_gim_suppliers")
    overall_start_time = time.time()
    try:
        # Log the start time of the readStream execution
        read_stream_start_time = time.time()
        logger.info(f"readStream execution started at {read_stream_start_time}")

        spark.conf.set("spark.sql.files.ignoreMissingFiles","true")
        spark._jvm.com.databricks.sql.transaction.tahoe.DeltaLog.clearCache()
        spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
        spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", "true")

        # Read from the source streaming table
        df= spark.readStream \
        .format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingTimestamp", starting_timestamp) \
        .option("endingTimestamp", ending_timestamp) \
        .load(primary_delta_table_path_url)
       
        # Log the end time of the readStream execution
        read_stream_end_time = time.time()
        logger.info(f"readStream execution completed at {read_stream_end_time}")
        logger.info(f"readStream execution took {read_stream_end_time - read_stream_start_time} seconds")
 
Any help would be appreciated

Bhaskar29_0-1718877478034.png

 

daniel_sahal
Databricks MVP

@Bhaskar29 
It looks like the source table got vacuumed, that's why streaming is unable to read the CDF for the given timestamp.

How do we handle this scenario ? Incremental loads are failing when we pass the starting_timestamp and ending_timestamp which we are deriving from the event log

 

# Path to the Delta table
event_log_path = "/pipelines/{pipeline_id}/system/events"
# Read Delta table into DataFrame

sql_query = f"""
SELECT id
,MAX(timestamp) AS timestamp
FROM delta.`{event_log_path}`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY id
LIMIT 1
"""
df_filtered = spark.sql(sql_query)

# Extract the timestamp from the result
top_record = df_filtered.first()
starting_timestamp = top_record['timestamp']
ending_timestamp = datetime.now()

rachelgreen2
New Contributor II

thanks for the info