Structured streaming from an overwrite delta path
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
08-02-2021 06:45 AM
Hi experts,
I need to ingest data from an existing delta path to my own delta lake.The dataflow is as shown in the diagram:
- Data team reads full snapshot of a database table and overwrite to a delta path. This is done many times per day, but not fixed schedule everyday.
def overwrite_microbatch(microdf, batchId):
microdf.write.format("delta").mode("overwrite").save(sink_path)
(spark.readStream
.format("delta")
.option("ignoreChanges", "true")
.load(source_path)
.writeStream
.foreachBatch(overwrite_microbatch)
.option("checkpointLocation", checkpoint_path)
.start())
(.writeStream.format("delta").outputMode("append") does not work because "append" mode causes duplication and writeStream does NOT support "overwrite" mode.)
Which works, but I ran into 2 problems:
- Sink path is not storage optimized, i.e each version stores a full table snapshot in a .snappy.parquet file instead of only incremental changes.
- If my streaming job fails to consume one or more versions, then the next microbatch contains a concat of 2+ versions that are not yet consumed. Which again causes duplication in sink path.
Best Regards,
Vu
Labels:
0 REPLIES 0

