cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Reduce the Time for First Spark Streaming Run Kick off

Naveenkumar1811
New Contributor II

Hi Team,

Currently I have a Silver Delta Table(External) is loading on Streaming and the Gold is on Batch.

I Need to Make the Gold Delta as well to Streaming. In My First Run I can the stream initializing process is taking an hour or so as my Silver table is Heavy... The Current Gold has the data processed till last batch still then why it is taking so long. Can we Improve to reduce this time. This is been now tested in QA. Before Prod need some solution to it.

3 REPLIES 3

stbjelcevic
Databricks Employee
Databricks Employee

You can make the Gold stream start much faster by avoiding the โ€œfull initial snapshotโ€ and bootstrapping from the point your Gold has already processed, plus a few rate limits and features tuned for heavy Delta tables.

When you stream directly from a Delta table, the engine first processes the entire current table state as an initial snapshot before moving to incremental changes, which is expensive on a โ€œheavyโ€ Silver table.

If your stream is stateful and uses a watermark, the default initial-snapshot file order (by last modification time) can also cause extra scanning and even late-data drops without event-time ordering.

Here are a few different things to consider that might help you out:

  • Stream from Silverโ€™s CDF and start at the current processed version. If your Silver has updates/deletes (not append-only), enable Change Data Feed (CDF) on Silver and read the CDF in Gold, starting from the version your Gold has already processed; this skips the big initial snapshot and only consumes row-level changes.
  • If Silver is append-only, stream the table itself (skip change commits): For purely append-only sources, you can stream the Delta table directly and set options to ignore non-append commits when they happen. Databricks recommends skipChangeCommits for new workloads.
  • Start the stream from โ€œlatestโ€ (no historical snapshot): If Gold is already up-to-date and you donโ€™t need the initial snapshot, set the Delta source starting point to the latest so you only process future changes.

Here are some good resources to take a look at as well:

 

Naveenkumar1811
New Contributor II

Hi,

Yes the Silver is Append only for us... I understand the skip change commits but I am not really sure on Start the stream from latest. I know for kafka we set the starting offset to latest. But in Delta file read how can you set it? 

stbjelcevic
Databricks Employee
Databricks Employee

Use the startingVersion option with value "latest" when configuring your Delta source for readStream. This makes the stream read only changes that arrive after the moment you start it, avoiding the heavy initial snapshot.

silver = "catalog.schema.silver"
checkpoint = "abfss://.../checkpoints/gold_stream"

stream_df = (spark.readStream
             .table(silver)
             .option("startingVersion", "latest")        # skip initial snapshot
             .option("maxBytesPerTrigger", 256 * 1024 * 1024)  # optional rate limit
             .option("maxFilesPerTrigger", 500))

(stream_df.writeStream
  .option("checkpointLocation", checkpoint)
  .toTable("catalog.schema.gold"))

Since your Silver is append-only, you donโ€™t need skipChangeCommits or CDF. startingVersion="latest" is the simplest way to avoid the heavy first pass while keeping Gold streaming in sync. 

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local communityโ€”sign up today to get started!

Sign Up Now