Backgroundand requirements: We are reading data from our factory and storing it in a DLT table called telemetry with columns sensorid, timestamp and value. We need to get rows where sensorid is โqrreader-xโ and join with some other data from that same table and store elsewhere. The qr codeโs are coming in with very low latency, much less than some of the sensors they should be joined with. There is need for a delay to wait for the other data coming in before processing these rows.
Suggestion: Can we create a DLT pipeline that would be run as batch every 5 minutes and only read rows that have timestamp older than x minutes?
@Dlt.table(
comment="A table summarizing counts of the top baby names for New York for 2021."
)
def top_baby_names_2021():
return (
dlt.read("baby_names_prepared")
.filter(expr("Year_Of_Birth == 2021"))
)
When looking at the above example code, I assume the pipeline would consider all rows coming in as handled, even though they are filtered out. So is there a way to put the filtered rows back into the pipeline if they are too fresh?