cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Error while reading steaming source from DLT Pipeline

Bhaskar29
New Contributor II

Hi All, 

I am getting this error, when I am reading the streaming source

Full load - It loads

Incremental load - am facing this error

This is the piece of code that am using 

def gim_suppliers_ln():
    logger.info("Starting __cn_gim_suppliers")
    overall_start_time = time.time()
    try:
        # Log the start time of the readStream execution
        read_stream_start_time = time.time()
        logger.info(f"readStream execution started at {read_stream_start_time}")

        spark.conf.set("spark.sql.files.ignoreMissingFiles","true")
        spark._jvm.com.databricks.sql.transaction.tahoe.DeltaLog.clearCache()
        spark.conf.set("spark.databricks.delta.retentionDurationCheck.enabled", "false")
        spark.conf.set("spark.databricks.delta.changeDataFeed.timestampOutOfRange.enabled", "true")

        # Read from the source streaming table
        df= spark.readStream \
        .format("delta") \
        .option("readChangeFeed", "true") \
        .option("startingTimestamp", starting_timestamp) \
        .option("endingTimestamp", ending_timestamp) \
        .load(primary_delta_table_path_url)
       
        # Log the end time of the readStream execution
        read_stream_end_time = time.time()
        logger.info(f"readStream execution completed at {read_stream_end_time}")
        logger.info(f"readStream execution took {read_stream_end_time - read_stream_start_time} seconds")
 
Any help would be appreciated

Bhaskar29_0-1718877478034.png

 

3 REPLIES 3

daniel_sahal
Esteemed Contributor

@Bhaskar29 
It looks like the source table got vacuumed, that's why streaming is unable to read the CDF for the given timestamp.

How do we handle this scenario ? Incremental loads are failing when we pass the starting_timestamp and ending_timestamp which we are deriving from the event log

 

# Path to the Delta table
event_log_path = "/pipelines/{pipeline_id}/system/events"
# Read Delta table into DataFrame

sql_query = f"""
SELECT id
,MAX(timestamp) AS timestamp
FROM delta.`{event_log_path}`
WHERE event_type = 'flow_progress'
AND details:flow_progress.status = 'COMPLETED'
GROUP BY id
LIMIT 1
"""
df_filtered = spark.sql(sql_query)

# Extract the timestamp from the result
top_record = df_filtered.first()
starting_timestamp = top_record['timestamp']
ending_timestamp = datetime.now()

rachelgreen2
New Contributor II

thanks for the info

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!