<?xml version="1.0" encoding="UTF-8"?>
<rss xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:rdf="http://www.w3.org/1999/02/22-rdf-syntax-ns#" xmlns:taxo="http://purl.org/rss/1.0/modules/taxonomy/" version="2.0">
  <channel>
    <title>article High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databricks in Technical Blog</title>
    <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/ba-p/52616</link>
    <description>&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" style="width: 559px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5553i8E090D4BE813A832/image-dimensions/559x559?v=v2" width="559" height="559" role="button" title="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" alt="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Introduction&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://cloud.google.com/pubsub/docs&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476345772&amp;amp;usg=AOvVaw2Nl99pTi4Rb3eW-3iHI8hG" target="_blank" rel="noopener"&gt;Google Pub/Sub&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt; is a fully managed messaging service with pub/sub semantics that is widely used by customers on Google Cloud Platform (GCP), often as an alternative to Kafka. Structured Streaming has long had support for other cloud platform streaming services (e.g. AWS Kinesis/Azure Eventhubs), but until recently, consuming a stream from Google Pub/Sub relied on the legacy Spark DStream API or Python consumer.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This article shows how to set up the new &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://docs.databricks.com/structured-streaming/pub-sub.html&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476346219&amp;amp;usg=AOvVaw2kDAgDPmEwddaNQfSzU_S2" target="_blank" rel="noopener"&gt;Pub/Sub Structured Streaming Connector&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;, now generally available in Databricks, and demonstrates the message throughput and semantics that can be achieved.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;SPAN&gt;Initial Configuration&lt;/SPAN&gt;&lt;/H3&gt;
&lt;H4&gt;&lt;SPAN&gt;Configuring the Topic&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;To evaluate the Pub/Sub Structured Streaming connector, we first need to create a topic within our GCP project. We disable the ‘Add a default subscription’ setting, in order to configure the subscription manually.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image11.png" style="width: 793px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5170i1B6853202E9D8440/image-size/large?v=v2&amp;amp;px=999" role="button" title="image11.png" alt="image11.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Creating the Subscription&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;Message consumers in Google Pub/Sub require a subscription, which enables tracking of which messages have been delivered and acknowledged. All the settings are set to the default with the exception of the message retention duration, which is reduced to limit costs during testing.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image23.png" style="width: 793px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5533i0862EDC9425B6973/image-size/large?v=v2&amp;amp;px=999" role="button" title="image23.png" alt="image23.png" /&gt;&lt;/span&gt;&lt;BR /&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;SPAN&gt;Publishing Test Data to Google Pub/Sub&lt;/SPAN&gt;&lt;/H3&gt;
&lt;H4&gt;&lt;SPAN&gt;Realistic Synthetic Messages&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;In order to provide a realistic stream of messages, we generate a list of 1000 random strings of size 8kb. These strings will be encoded using UTF-8, as the payload of a Pub/Sub message is a byte array. We also add some message attributes to resemble typical real-life use cases (an integer, a JSON field, and a UUID), which are encoded as strings. An example message is shown below:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="javascript"&gt;{
  "payload": "lEpg.. &amp;lt;8kb random string&amp;gt; ..Zkra",
  "attributes": {
    "message_id": "0", 
    "another_attribute": "1", 
    "json": "{\"attribute\": \"bkuAmZFbXI\", \"int\": \"0\", \"status\": \"insert\"}", 
    "uuid": "815df24f7c6f44a588af1b38942ec40d"
  }
}&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Publishing using the Python API&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;We now publish the 8kb synthetic messages to the topic using the &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://cloud.google.com/pubsub/docs/samples/pubsub-publish-custom-attributes&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476348119&amp;amp;usg=AOvVaw1Q2nS2KF3MeMpbLMXJ0BAA" target="_blank" rel="noopener"&gt;Python API&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;. Using the &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://docs.python.org/3/library/multiprocessing.html%23module-multiprocessing&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476348246&amp;amp;usg=AOvVaw1e6E4ywK-cI38QNAklBfNd" target="_blank" rel="noopener"&gt;multiprocessing&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;module, we run 32 threads on a single &lt;/SPAN&gt;&lt;EM&gt;n2-highcpu-16&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;instance &lt;/SPAN&gt;&lt;SPAN&gt;(see GCP &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://cloud.google.com/compute/docs/general-purpose-machines&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476348454&amp;amp;usg=AOvVaw2CirDNoFn11JdSim03yEF6" target="_blank" rel="noopener"&gt;instance types&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;) &lt;/SPAN&gt;&lt;SPAN&gt;which publishes the 8kb messages at an approximate rate of 50k/s, which is around ~400Mb/s network I/O (monitored using Ganglia metrics).&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;from concurrent import futures
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

publish_futures = []

for i in range(1000):
  # excluded code to lookup random payload and attributes

  publish_future = publisher.publish(
    topic_path, data.encode("utf-8"), 
    # message attributes below this line
    message_id=.., this_attribute=.., json=.., reverse=.., this_attribute_2=..)
  publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;We run this notebook as a Databricks job, parameterised so that it can either run continuously, publishing up to 50k/s, or generate a fixed number of messages and then stop, which is useful in testing how quickly a consumer can process a backlog of messages.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The metrics tab on the topic in the GCP console shows the published message count to be just over 50k/s, and the size of the messages, as expected, to be a fixed 8kb, with total throughput at ~400MB/s:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE class=" lia-align-center" style="height: 281px; width: 900px; border-style: hidden; margin-left: auto; margin-right: auto;" border="0"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1" width="760px" height="281px"&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;TABLE class=" lia-align-center" style="border-style: hidden;" border="0"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1" width="252px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image21.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5181i71FF0C9D5CDDED52/image-size/large?v=v2&amp;amp;px=999" role="button" title="image21.png" alt="image21.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="258px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image15.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5174i2CC09CBC23C3548D/image-size/large?v=v2&amp;amp;px=999" role="button" title="image15.png" alt="image15.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="256px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image3.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5163i24AF2DF71780C35D/image-size/large?v=v2&amp;amp;px=999" role="button" title="image3.png" alt="image3.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="40px" height="281px"&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;Note that there is a lag of up to 3 minutes before the updated metrics are visible, as documented &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://cloud.google.com/monitoring/api/metrics_gcp&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476352179&amp;amp;usg=AOvVaw0yNDcyTm7Q9u0Z9b8S5inS" target="_blank" rel="noopener"&gt;here&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;. We are now ready to consume messages from the subscription.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H3&gt;&lt;SPAN&gt;Consuming from Pub/Sub&lt;/SPAN&gt;&lt;/H3&gt;
&lt;H4&gt;&lt;SPAN&gt;Google Pub/Sub vs Kafka Consumers&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;For streaming consumers, there are some key differences between Google Pub/Sub&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;and Kafka - perhaps most important is that each message has to be &lt;/SPAN&gt;&lt;EM&gt;acknowledged separately&lt;/EM&gt;&lt;SPAN&gt;, otherwise the message will be redelivered.&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Kafka&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Google Pub/Sub&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Ordering&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Messages ordered within a &lt;/SPAN&gt;&lt;EM&gt;partition&lt;/EM&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Messages are not ordered by default. They can be ordered within a &lt;/SPAN&gt;&lt;EM&gt;topic&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;if it is enabled.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Offset management&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Each consumer maintains an offset, which indicates which records in the queue have already been processed.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Each message is acknowledged individually. If a message is not acknowledged, it will be redelivered after a specific time. Messages can also be re-sent even if acknowledged.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Replaying historical messages&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Offset set to an earlier point in the queue, all messages after this point are then redelivered.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Messages can be marked as undelivered based on a timestamp, which will redeliver all messages after this timestamp.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;H4&gt;&lt;SPAN&gt;Exactly-Once Semantics&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;Typically, streaming pipelines implement &lt;/SPAN&gt;&lt;EM&gt;at-least-once&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;semantics. Messages are guaranteed to be delivered to the target table or downstream pipeline, but in the event of a failure (such as a rebalance), there may be duplicate messages. At-least-once semantics require all downstream consumers to be idempotent - they will handle duplicates without causing issues with data integrity.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Exactly-once semantics&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;also guarantees message delivery, but additionally guarantees that there are no duplicate messages. This is more difficult to achieve, but removes the requirement for downstream components to handle or filter out duplicates, reducing complexity. &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://www.databricks.com/blog/2017/01/19/real-time-streaming-etl-structured-streaming-apache-spark-2-1.html&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476356070&amp;amp;usg=AOvVaw3TdAtFL3Z5Qf4_6J-BJuWj" target="_blank" rel="noopener"&gt;Structured streaming supports exactly-once semantics&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;provided that the target sink also supports it. &lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Consuming with Python&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;Until recently, there was no easy way to consume messages into Databricks from Pub/Sub. Previously many users would consume using the &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://cloud.google.com/pubsub/docs/samples/pubsub-subscriber-sync-pull&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476356463&amp;amp;usg=AOvVaw3vHYWwqJXGWvqjwKgzB7jU" target="_blank" rel="noopener"&gt;Python API&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;and then write out via a Dataframe - while this approach works, it is overly complex, and difficult to maintain and monitor. &lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;A possible architecture for this approach is shown below. To ensure at-least-once semantics, we first write the output of the Python API to a Delta table, using one Notebook job. We then set up a streaming job to consume from the Delta table to acknowledge message delivery back to Pub/Sub. In this way, messages are only marked as delivered once they are written to the Delta table.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image5.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5164i619F0047187702F3/image-size/large?v=v2&amp;amp;px=999" role="button" title="image5.png" alt="image5.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Many duplicates are generated, because the messages aren’t acknowledged fast enough. We would likely then need an expensive deduplication process, which would add further cost and latency. &lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Architecture of the Structured Streaming Pub/Sub consumer&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;As mentioned above, Google Pub/Sub requires an acknowledgement for each individual message. If a message is not acknowledged within the Acknowledgement Deadline specified on subscription creation (see above), then the message will be redelivered to the consumer, which could result in duplicates.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;To ensure these duplicates are not propagated downstream, the Structured Streaming connector is implemented as two parallel jobs:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Message Fetching job&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Polls Pub/Sub for new messages, writing the message payload and metadata down to the checkpoint folder and writing the metadata to RocksDb. The message is not acknowledged back to Pub/Sub until it is successfully written to RocksDb and replicated to cloud.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;In RocksDb the metadata is sharded by the messageId in order to be able to identify messages that have been seen before.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Microbatch Processing job&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Polls RocksDb for new micro-batches of messages that can be written downstream.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image1.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5162iAED5E98B3607E8F4/image-size/large?v=v2&amp;amp;px=999" role="button" title="image1.png" alt="image1.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Consuming with Structured Streaming&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;Setting up the &lt;/SPAN&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://docs.databricks.com/structured-streaming/pub-sub.html&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476358997&amp;amp;usg=AOvVaw0A_-42rDuos3IgTVLmpccO" target="_blank" rel="noopener"&gt;Pub/Sub Spark Structured Streaming consumer&lt;/A&gt;&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;in Databricks is very straightforward. We simply write the stream output to a Delta table, with no transformations, so that we can check the exactly-once semantics - i.e. there are no missing or duplicate records. In practice, the stream could be used for near-real time analytics, as part of a medallion architecture, or pushed to other streaming pipelines.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Our cluster will be initially configured with 8 workers, each with 8 cores, as shown below:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image10.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5169i68A5D00D946F6D3D/image-size/large?v=v2&amp;amp;px=999" role="button" title="image10.png" alt="image10.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;To read from the Pub/Sub stream, we use the familiar syntax &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;spark.readStream&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;, with &lt;/SPAN&gt;&lt;SPAN&gt;format&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;set to &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;pubsub&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;. The &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;subscriptionId&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;topicId&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;and &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;projectId&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;are mandatory options.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The account that is consuming from the subscription must be granted &lt;/SPAN&gt;&lt;EM&gt;both&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;Pub/Sub Subscriber&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;and &lt;/SPAN&gt;&lt;SPAN&gt;Pub/Sub Viewer&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;access (Note: in a future release, the requirement for &lt;/SPAN&gt;&lt;EM&gt;Pub/Sub Viewer&lt;/EM&gt;&lt;SPAN&gt;&amp;nbsp;will likely be removed). These permissions need to be either granted to the account (user account or service account) used to run the cluster, or a different service account can be used, by passing &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;clientId&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;clientEmail&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;, &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;privateKey&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;and &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;privateKeyId&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;as options to &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;readStream&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;. These parameters can be stored as secrets, or extracted from the JSON configuration file generated when the key is created:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;project_id = &amp;lt;..&amp;gt;
topic_id = dbutils.widgets.get("topic_id")
subscription_id = dbutils.widgets.get("subscription_id")

# specify the service account credentials using databricks secrets
authOptions ={
  "clientId" -&amp;gt; dbutils.secrets.get("secret-scope", "clientId"),
  "clientEmail" -&amp;gt; dbutils.secrets.get("secret-scope", "clientEmail"),
  "privateKey" -&amp;gt; dbutils.secrets.get("secret-scope", "privateKey"),
  "privateKeyId" -&amp;gt; dbutils.secrets.get("secret-scope", "privateKeyId")
}

# read the service account credentials from a JSON key file
with open("&amp;lt;service account JSON key file&amp;gt;.json", 'r') as f:
  data = json.load(f)

authOptions = {
  "clientId": data['client_id'],
  "clientEmail": data['client_email'],
  "privateKey": data['private_key'],
  "privateKeyId": data['private_key_id']
}

df = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", subscription_id) # required
  .option("topicId", topic_id) # required
  .option("projectId", project_id) # required
  .options(**authOptions) # required only if using a service principal
  .load()
)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The Dataframe returned from the Pub/Sub stream has the following schema:&lt;/SPAN&gt;&lt;/P&gt;
&lt;PRE&gt;&lt;SPAN&gt;messageId: &lt;/SPAN&gt;&lt;SPAN&gt;string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;payload: &lt;/SPAN&gt;&lt;SPAN&gt;binary&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;attributes: &lt;/SPAN&gt;&lt;SPAN&gt;string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;publishTimestampInMillis: &lt;/SPAN&gt;&lt;SPAN&gt;long&lt;/SPAN&gt;&lt;/PRE&gt;
&lt;P&gt;&lt;SPAN&gt;Before writing the table to Delta we decode the UTF-8 encoded string, and unpack the attributes column, which is returned as a JSON map, into a column of type MAP&amp;lt;STRING, STRING&amp;gt;, and:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;import pyspark.sql.types as T
import pyspark.sql.functions as F

query = (df
  .withColumn("payload", F.decode(F.col("payload"), "UTF-8"))
  .withColumn("attributes", F.from_json(F.col("attributes"), T.MapType(T.StringType(), T.StringType())))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpoint_path)
  .start(save_path)
)
&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;So the final Dataframe in the Delta table has the following schema:&lt;/SPAN&gt;&lt;/P&gt;
&lt;PRE&gt;&lt;SPAN&gt;messageId: &lt;/SPAN&gt;&lt;SPAN&gt;string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;payload: &lt;/SPAN&gt;&lt;SPAN&gt;string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;attributes: &lt;/SPAN&gt;&lt;SPAN&gt;map&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;key&lt;/SPAN&gt;&lt;SPAN&gt;: string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&amp;nbsp;&lt;/SPAN&gt;&lt;SPAN&gt;value&lt;/SPAN&gt;&lt;SPAN&gt;: string&lt;/SPAN&gt;&lt;BR /&gt;&lt;SPAN&gt;publishTimestampInMillis: &lt;/SPAN&gt;&lt;SPAN&gt;long&lt;/SPAN&gt;&lt;/PRE&gt;
&lt;P&gt;&lt;SPAN&gt;On running the notebook, the familiar stream metrics UI shows that the messages are being consumed and written down to Delta:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image24.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5182iE8084C22A2FC825E/image-size/large?v=v2&amp;amp;px=999" role="button" title="image24.png" alt="image24.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;The output dataset in Delta can be seen below:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image16.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5175i334E958320AE2E5C/image-size/large?v=v2&amp;amp;px=999" role="button" title="image16.png" alt="image16.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Monitoring message throughput&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;It is essential to monitor any streaming pipeline to ensure that it is keeping up with the rate of input messages, which could fluctuate over time, keeping any backlog to a minimum. Equally, it is important to ensure that compute resources are not under utilised when the rate of input messages is low. &lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;There are several ways to monitor the performance of the Pub/Sub Structured Streaming connector.&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;SPAN&gt;Using the &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;&lt;A href="https://www.google.com/url?q=https://spark.apache.org/docs/latest/api/python/reference/pyspark.ss/api/pyspark.sql.streaming.StreamingQueryListener.html&amp;amp;sa=D&amp;amp;source=editors&amp;amp;ust=1700148476368033&amp;amp;usg=AOvVaw3TEMD_YlwGcv3KTnH5MNZ6" target="_blank" rel="noopener"&gt;StreamingQueryListener&lt;/A&gt;&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;API, example code shown below:&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;from pyspark.sql.streaming import StreamingQueryListener

class PubSubListener(StreamingQueryListener):
  def onQueryStarted(self, event):
    pass

  def onQueryProgress(self, event):
    try:
      print(event.progress.sources[0].numInputRows)
      print(event.progress.sources[0].inputRowsPerSecond)
      print(event.progress.sources[0].processedRowsPerSecond)
      print(event.progress.sources[0].metrics)
    except Exception as e:
      print(f"Error occurred {e}")

  def onQueryTerminated(self, event):
    print(f"{event.id} got terminated!")

  def onQueryIdle(self, event):
    pass

listener = PubSubListener()
spark.streams.addListener(listener)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This API returns the same metrics that are displayed in the stream metrics UI above. It also returns Pub/Sub custom metrics, such as &lt;/SPAN&gt;&lt;SPAN&gt;numRecordsReadyToProcess&lt;/SPAN&gt;&lt;SPAN&gt;, however this only returns the count of messages available to be read at that point, and does not correlate with the number of unacknowledged messages.&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;SPAN&gt;Viewing the metrics tab on the subscription page in the GCP console, example graphs shown in the subsequent section.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;SPAN&gt;Calling the GCP metrics API to pull the metrics for that subscription, example code shown below:&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;import datetime
import time

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

client = monitoring_v3.MetricServiceClient()

while True and stream.isActive: # stream is the Spark stream reference
  time.sleep(10)

  results = query.Query(
    client,
    project_id,
    'pubsub.googleapis.com/subscription/num_undelivered_messages',
    end_time=datetime.datetime.now(),
    minutes=1,
  ).select_resources(subscription_id=subscription_id)

  undelivered_messages = ([point.value.int64_value for result in results for
    point in result.points] or [None])[0]

  if not undelivered_messages is None:
    print(f"{undelivered_messages} undelivered messages")&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;H4 id="h.ntbk509szt8l"&gt;&lt;SPAN&gt;Configuring for maximum performance&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;We can see from the subscription metrics tab shown below that with the 50k/s publish message rate, the structured streaming consumer is only achieving half the required throughput of messages, given the current default parameters and cluster size. The oldest unacknowledged message age is continuing to grow:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE class="lia-align-center" style="border-style: hidden; width: 819px; margin-left: auto; margin-right: auto;" width="819"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1" width="403px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image4.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5161i9FCCBA3DA612754E/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image4.png" alt="image4.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="416px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image20.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5179i7953704892EEFC4C/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image20.png" alt="image20.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;To ensure the messages are being processed in a timely way, we need to increase throughput, ideally without increasing cluster size. We increase the &lt;/SPAN&gt;&lt;SPAN&gt;numFetchPartitions&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;parameter, which defaults to one partition per executor, to one partition per core (i.e. 64 as we have 64 cores).&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE class="lia-align-center" style="border-style: hidden; margin-left: auto; margin-right: auto;"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image6.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5165iEEF619EAD9FA68A4/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image6.png" alt="image6.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image7.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5166iE2C296284D3721BC/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image7.png" alt="image7.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;The messages are now being consumed faster than they are being published, and the message age is beginning to decrease. But can we do better? Increasing &lt;/SPAN&gt;&lt;SPAN&gt;numFetchPartitions&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;to 128 doesn’t double the throughput, but it does improve performance to around 95k/s, which equates to over 700MB/s.&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image17.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5177i5BB316C6BAEFD086/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image17.png" alt="image17.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;We now have the option of reducing the cluster size down to two workers of n2-standard-16. We can see that this cluster configuration can’t quite maintain the 50k/s messages throughput, so it is better to retain at least three workers to allow it to catch up from any outages or spikes in the message volume.&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE class="lia-align-center" style="border-style: hidden; margin-left: auto; margin-right: auto;"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image19.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5178i2741B2944D9E4726/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image19.png" alt="image19.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image22.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5180i92DFF8F7F9A14FAF/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image22.png" alt="image22.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;In a final test, we reduce the message size, but increase the volume of inbound messages to 300k/s, to ensure that this increased message rate can still be supported by the Structured Streaming connector. We see in the topic metrics below, that the publish throughput is now around 250Mb/s:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE class="lia-align-center" style="border-style: hidden; margin-left: auto; margin-right: auto;"&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1" width="272px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image18.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5176i0C086A46E6B0FDA7/image-size/large?v=v2&amp;amp;px=999" role="button" title="image18.png" alt="image18.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="276px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image12.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5171i629F48CEE67DBFE7/image-size/large?v=v2&amp;amp;px=999" role="button" title="image12.png" alt="image12.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1" width="272px"&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="image13.png"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5173i0C3E1DC9477C8742/image-size/large?v=v2&amp;amp;px=999" role="button" title="image13.png" alt="image13.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;To process this volume of messages, we increase the cluster size to 20 workers. This configuration provides enough throughput to consume around 320k/s rate of messages, which will allow us to process any backlog of messages:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image14.png" style="width: 400px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5172iABA3E2A091286E5C/image-size/medium?v=v2&amp;amp;px=400" role="button" title="image14.png" alt="image14.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4 id="h.kuwo8y368qpf"&gt;&lt;SPAN&gt;Configuring for minimum latency&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;The configuration used above is optimized for throughput but not latency. To reduce the latency, we configure the following setting on the Structured Streaming consumer:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;spark.readStream
  ...
  .option("maxFetchPeriod", "1s")&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;And on the Delta table stream writer:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;  .trigger(processingTime='1 seconds')&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;Pub/Sub already provides a timestamp field indicating when the field was published, this is written to our Delta table so we use this to calculate the latency (in milliseconds), for our test dataset of 100k records:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image9.png" style="width: 793px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5535i4A062FDC4C5DB4B8/image-size/large?v=v2&amp;amp;px=999" role="button" title="image9.png" alt="image9.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;But can we do better? The Spark UI indicates multiple micro-batches are required to consume the 100k records, because the value of &lt;/SPAN&gt;&lt;SPAN&gt;maxRecordsPerFetch&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;is too small. So we increase this:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;LI-CODE lang="python"&gt;  .option("maxRecordsPerFetch", 10000)&lt;/LI-CODE&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;This means all the records fit in a single iteration of the microbatch, reducing the latency down to below 10 seconds:&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image2.png" style="width: 793px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5534i685B244ACEC838EF/image-size/large?v=v2&amp;amp;px=999" role="button" title="image2.png" alt="image2.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Failure scenarios&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;Before testing various failure scenarios, we will consume from the same topic using a &lt;/SPAN&gt;&lt;SPAN&gt;separate&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;subscription, using the Python API consumer as described earlier. We know from this architecture that messages will only be acknowledged to Pub/Sub once they are in the Delta table, guaranteeing at-least-once semantics.&lt;/SPAN&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-center" image-alt="image8.png" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5167iC81D0BF0A6108E2A/image-size/large?v=v2&amp;amp;px=999" role="button" title="image8.png" alt="image8.png" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&lt;SPAN&gt;To test the Structured Streaming implements ‘exactly once’ semantics, we need to ensure the stream pipeline conforms to two criteria:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Criteria&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Evaluation Approach&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Passed&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;No duplicate messages are created in the target Delta table&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Check for duplicate messages in the target table, testing the &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;MessageId&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;and the &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;UUID&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;columns separately. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Yes&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;No messages exist in the Pub/Sub subscription that were not also written to the Delta table&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Check for messages that exist in the Delta table generated from the Python API that do not exist in the table generated from the Structured Streaming consumer, using the &lt;/SPAN&gt;&lt;FONT face="courier new,courier"&gt;&lt;SPAN&gt;LEFT &lt;/SPAN&gt;&lt;SPAN&gt;ANTI&lt;/SPAN&gt;&lt;SPAN&gt;&amp;nbsp;JOIN&lt;/SPAN&gt;&lt;/FONT&gt;&lt;SPAN&gt;&amp;nbsp;syntax. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Yes&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;We evaluate several failure scenarios during a running stream:&lt;/SPAN&gt;&lt;/P&gt;
&lt;TABLE&gt;
&lt;TBODY&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Failure Scenario&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;STRONG&gt;Impact&lt;/STRONG&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Increasing cluster size during pipeline running&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;No impact to latency. No duplicate or missing messages. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Reducing cluster size during pipeline running&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Latency increases during downsize, and then may stay higher depending on throughput. No duplicate or missing messages. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Graceful shutdown of the Structured Streaming pipeline&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;No duplicate or missing messages. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Hard shutdown of the Structured Streaming pipeline&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;No duplicate or missing messages. &lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;TR&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Removal or modification of the checkpoint folder&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;TD colspan="1" rowspan="1"&gt;
&lt;P&gt;&lt;SPAN&gt;Stream fails and will not restart until the checkpoint folder is removed. Messages will need to be replayed from an earlier date and then deduplicated manually.&lt;/SPAN&gt;&lt;/P&gt;
&lt;/TD&gt;
&lt;/TR&gt;
&lt;/TBODY&gt;
&lt;/TABLE&gt;
&lt;P&gt;&lt;SPAN&gt;Of all the scenarios tested, only the final scenario caused duplicate or missing messages.&lt;/SPAN&gt;&lt;/P&gt;
&lt;H4&gt;&lt;SPAN&gt;Conclusion&lt;/SPAN&gt;&lt;/H4&gt;
&lt;P&gt;&lt;SPAN&gt;We have seen that the new Spark Structured Streaming Pub/Sub connector is a convenient, performant solution for reliably consuming messages from Google Pub/Sub:&lt;/SPAN&gt;&lt;/P&gt;
&lt;UL&gt;
&lt;LI&gt;&lt;SPAN&gt;We showed how it scaled on a small cluster to sustain an impressive throughput of 400MB/s, consisting of 50k/s relatively large (8kb) synthetic messages, persisting them to a Delta table. &lt;/SPAN&gt;&lt;/LI&gt;
&lt;LI&gt;&lt;SPAN&gt;We also saw that with the right configuration, latency can be reduced as low as 10 seconds from messages being published through to being available for querying in a Delta table.&lt;/SPAN&gt;&lt;/LI&gt;
&lt;/UL&gt;
&lt;P&gt;&lt;SPAN&gt;This new capability will both enable existing GCP customers to unlock new use cases, and allow Spark users with streaming use cases to migrate to GCP.&lt;/SPAN&gt;&lt;/P&gt;</description>
    <pubDate>Mon, 11 Dec 2023 17:48:51 GMT</pubDate>
    <dc:creator>thewizard</dc:creator>
    <dc:date>2023-12-11T17:48:51Z</dc:date>
    <item>
      <title>High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databricks</title>
      <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/ba-p/52616</link>
      <description>&lt;P&gt;&lt;SPAN&gt;&lt;span class="lia-inline-image-display-wrapper lia-image-align-inline" image-alt="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" style="width: 999px;"&gt;&lt;img src="https://community.databricks.com/t5/image/serverpage/image-id/5553i8E090D4BE813A832/image-size/large?v=v2&amp;amp;px=999" role="button" title="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" alt="_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg" /&gt;&lt;/span&gt;&lt;/SPAN&gt;&lt;/P&gt;
&lt;P&gt;&amp;nbsp;&lt;/P&gt;</description>
      <pubDate>Mon, 11 Dec 2023 17:48:51 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/ba-p/52616</guid>
      <dc:creator>thewizard</dc:creator>
      <dc:date>2023-12-11T17:48:51Z</dc:date>
    </item>
    <item>
      <title>Re: High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databr</title>
      <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/71971#M189</link>
      <description>&lt;P&gt;Thanks for the detailed blog! I'm testing the similar scenario. However in the production, we have to add additional deduplication to achieve the exactly once from data sources until delta table. No data source publisher can ensure exactly once delivery. Therefore we apply the dropDuplicatesWithinWatermark with unique event ID, which is recommended in &lt;A href="https://docs.gcp.databricks.com/en/structured-streaming/watermarks.html" target="_blank"&gt;https://docs.gcp.databricks.com/en/structured-streaming/watermarks.html&lt;/A&gt;. According to my test, we do lose events from time to time as long as adding dropDuplicatesWithinWatermark. Without dropDuplicatesWithinWatermark, I never lose events.&lt;/P&gt;&lt;P&gt;&lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/32121"&gt;@thewizard&lt;/a&gt;: Do you test such scenario? Or do you know the detailed design and implementation of dropDuplicatesWithinWatermark would cause issue? Thanks!&lt;/P&gt;</description>
      <pubDate>Fri, 07 Jun 2024 03:06:33 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/71971#M189</guid>
      <dc:creator>aerofish</dc:creator>
      <dc:date>2024-06-07T03:06:33Z</dc:date>
    </item>
    <item>
      <title>Re: High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databr</title>
      <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/72007#M190</link>
      <description>&lt;P&gt;I don't know enough about your configuration and use case to really answer this. There shouldn't be duplicates written by the pub/sub connector, as they are deduped in RocksDB. However they are deduped using the messageId field, which may be different to your "unique event ID". There is no way of the pub/sub connector deduping on something other than the messageId, so if there are dupliate records coming in to pub/sub, then you would stil need to use dropDuplicatesWithinWatermark to get rid of them.&lt;/P&gt;</description>
      <pubDate>Fri, 07 Jun 2024 07:35:08 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/72007#M190</guid>
      <dc:creator>thewizard</dc:creator>
      <dc:date>2024-06-07T07:35:08Z</dc:date>
    </item>
    <item>
      <title>Re: High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databr</title>
      <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/72280#M192</link>
      <description>&lt;P&gt;Hi &lt;a href="https://community.databricks.com/t5/user/viewprofilepage/user-id/32121"&gt;@thewizard&lt;/a&gt; , thanks for your quick reply. Yes, I have to do the deduplication based on my business unique ID. Because the message publisher (neither pub/sub, nor Databricks connector) could cause the duplication due to retry publishing. Therefore I'm relying on dropDuplicatesWithinWatermark to do deduplication, and this method cause message lost...&lt;/P&gt;</description>
      <pubDate>Tue, 11 Jun 2024 02:33:04 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/72280#M192</guid>
      <dc:creator>aerofish</dc:creator>
      <dc:date>2024-06-11T02:33:04Z</dc:date>
    </item>
    <item>
      <title>Re: High Throughput ‘Exactly Once’ Streaming from Google Pub/Sub with Structured Streaming on Databr</title>
      <link>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/75155#M202</link>
      <description>&lt;P&gt;Apologies for late reply, I was at DAIS. You should not receive a message to be dropped with&amp;nbsp;&lt;SPAN&gt;dropDuplicatesWithinWatermark (except where it is a genuine duplicate, based on the specific keys). So I would raise a support ticket if the problem persists so that we can investigate.&lt;/SPAN&gt;&lt;/P&gt;</description>
      <pubDate>Thu, 20 Jun 2024 09:21:06 GMT</pubDate>
      <guid>https://community.databricks.com/t5/technical-blog/high-throughput-exactly-once-streaming-from-google-pub-sub-with/bc-p/75155#M202</guid>
      <dc:creator>thewizard</dc:creator>
      <dc:date>2024-06-20T09:21:06Z</dc:date>
    </item>
  </channel>
</rss>

