You could try to keep the data in sync by appending the new data dataframe in a forEachBatch on your write stream, this method allows for arbitrary ways to write data, you can connect to the Datawarehouse with jdbc if necessary:with your batch function being something like:
df = spark.readStream\
.format('delta')\
.load(input_path)
df_write = df.writeStream \
.format("delta") \
.foreachBatch(batch_write_jdbc) \
.option("checkpointLocation", chekpoint) \
.start("noop")\
Noop is dummy operation of write which will not actually write but starte the stream process which call the batch function that writes using jdbc
with your batch function being something like:
def batch_write_jdbc (df, batchId):
df = df.anytransformation
df.write.jdbc(jdbc_url, table=schema_name + "." + table_name, mode="append", properties=connection_properties)