2 weeks ago
Hi Team,
Currently I have a Silver Delta Table(External) is loading on Streaming and the Gold is on Batch.
I Need to Make the Gold Delta as well to Streaming. In My First Run I can the stream initializing process is taking an hour or so as my Silver table is Heavy... The Current Gold has the data processed till last batch still then why it is taking so long. Can we Improve to reduce this time. This is been now tested in QA. Before Prod need some solution to it.
2 weeks ago - last edited 2 weeks ago
You can make the Gold stream start much faster by avoiding the โfull initial snapshotโ and bootstrapping from the point your Gold has already processed, plus a few rate limits and features tuned for heavy Delta tables.
When you stream directly from a Delta table, the engine first processes the entire current table state as an initial snapshot before moving to incremental changes, which is expensive on a โheavyโ Silver table.
If your stream is stateful and uses a watermark, the default initial-snapshot file order (by last modification time) can also cause extra scanning and even late-data drops without event-time ordering.
Here are a few different things to consider that might help you out:
Here are some good resources to take a look at as well:
2 weeks ago
Hi,
Yes the Silver is Append only for us... I understand the skip change commits but I am not really sure on Start the stream from latest. I know for kafka we set the starting offset to latest. But in Delta file read how can you set it?
2 weeks ago
Use the startingVersion option with value "latest" when configuring your Delta source for readStream. This makes the stream read only changes that arrive after the moment you start it, avoiding the heavy initial snapshot.
silver = "catalog.schema.silver"
checkpoint = "abfss://.../checkpoints/gold_stream"
stream_df = (spark.readStream
.table(silver)
.option("startingVersion", "latest") # skip initial snapshot
.option("maxBytesPerTrigger", 256 * 1024 * 1024) # optional rate limit
.option("maxFilesPerTrigger", 500))
(stream_df.writeStream
.option("checkpointLocation", checkpoint)
.toTable("catalog.schema.gold"))
Since your Silver is append-only, you donโt need skipChangeCommits or CDF. startingVersion="latest" is the simplest way to avoid the heavy first pass while keeping Gold streaming in sync.
2 weeks ago
Hi stbjelcevic,
Our Silver source is loaded by streaming process... The gold right now is running every 10 mins batch and this is running in prod now...
Since it is prod Scenario and source is streaming load I am worrying about the data loss we might get ... Can you please guide me on this approach but taking care of any data loss incurring with this approach?
Thanks,
Naveen
2 weeks ago - last edited 2 weeks ago
@Naveenkumar1811 Since your silver is a streaming job, there can be lots of files and metadata being created based on your write interval and frequency of new data. If there are more files being created in few mins, it potentially leads to small file issue and in this case your read speed will be affected. Try to optimize and Vacuum and see if you can note any difference in read. If you find a difference, you can regularly optimize this table by scheduling a job for it, or you can also test with liquid clustering which handles this scenario a bit effectively.
Try to Read more on Vacuum and the retention policy before executing it.
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now