From the information you provided, your issue might be resolved by setting a watermark on the streaming dataframe. The purpose of watermarks is to set a maximum time for records to be retained in state. Without a watermark, records in your state will accumulate in memory, eventually resulting in an OOM error. Additionally, your job could have other performance hits as state accumulates over time.
In your case, assuming it's not necessary to retain all records in state over the lifetime of the job, you should set a reasonable window for records to be removed from state. For example, you could apply a 10 minute watermark like this:
`df.withWatermark("event_time", "10 minutes")`
Please refer to this Databricks documentation article on watermarks, including code examples: https://docs.databricks.com/en/structured-streaming/watermarks.html