Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-25-2025 10:38 AM
Hi Oliver,
see below screenshot below.
first the cluster is initializing. (so the startup is finished the delay does not come from cluster startup)
then flow are appended to pipeline (around 30 flows).
these flows takes time to get initialize, and i believe there is a maximum of around 10 concurrent flow possible. Most of the flow are empty, no messages in queue. so i am expecting very fast processing time but it is not the case.
due to low volumetry (i am in development step not in production) i can not see any metrics.
code are paste below
dlt.create_streaming_table(f"events",
comment = "events",
schema="messageId String Not null, payload binary not null, attributes string not null, publishTimestampInMillis bigint not null"
)
def append_flow_to_table(topic, authOpts):
subscriptionId = f"sub_{topic}"
@Dlt.append_flow(
target = f"events",
name = f"`{topic}`",
comment = f"{topic} flow"
)
def flow():
return (
spark.readStream
.format("pubsub")
.option("subscriptionId", subscriptionId)
.option("topicId", topic)
.option("projectId", projectId)
.option("deleteSubscriptionOnStreamStop", "false")
.options(**authOpts)
.load()
)
for pipeline in pipelines:
print(pipeline['topic'])
append_flow_to_table(pipeline['topic'], authOptions)
activeStreams = [q.name for q in spark.streams.active]
print("actives streams: ")
print(activeStreams)
I will try to look at the google log at the next run.
I run this delta live table pipeline once per day
thank you