cancel
Showing results for 
Search instead for 
Did you mean: 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results for 
Search instead for 
Did you mean: 
thewizard
Databricks Employee
Databricks Employee

_76b95afa-9629-4a6f-a908-b9dc58a04f6c.jpeg

Introduction

Google Pub/Sub is a fully managed messaging service with pub/sub semantics that is widely used by customers on Google Cloud Platform (GCP), often as an alternative to Kafka. Structured Streaming has long had support for other cloud platform streaming services (e.g. AWS Kinesis/Azure Eventhubs), but until recently, consuming a stream from Google Pub/Sub relied on the legacy Spark DStream API or Python consumer.

This article shows how to set up the new Pub/Sub Structured Streaming Connector, now generally available in Databricks, and demonstrates the message throughput and semantics that can be achieved.

Initial Configuration

Configuring the Topic

To evaluate the Pub/Sub Structured Streaming connector, we first need to create a topic within our GCP project. We disable the ‘Add a default subscription’ setting, in order to configure the subscription manually.

image11.png

Creating the Subscription

Message consumers in Google Pub/Sub require a subscription, which enables tracking of which messages have been delivered and acknowledged. All the settings are set to the default with the exception of the message retention duration, which is reduced to limit costs during testing.

image23.png

Publishing Test Data to Google Pub/Sub

Realistic Synthetic Messages

In order to provide a realistic stream of messages, we generate a list of 1000 random strings of size 8kb. These strings will be encoded using UTF-8, as the payload of a Pub/Sub message is a byte array. We also add some message attributes to resemble typical real-life use cases (an integer, a JSON field, and a UUID), which are encoded as strings. An example message is shown below:

 

 

{
  "payload": "lEpg.. <8kb random string> ..Zkra",
  "attributes": {
    "message_id": "0", 
    "another_attribute": "1", 
    "json": "{\"attribute\": \"bkuAmZFbXI\", \"int\": \"0\", \"status\": \"insert\"}", 
    "uuid": "815df24f7c6f44a588af1b38942ec40d"
  }
}

 

 

Publishing using the Python API

We now publish the 8kb synthetic messages to the topic using the Python API. Using the multiprocessing module, we run 32 threads on a single n2-highcpu-16 instance (see GCP instance types) which publishes the 8kb messages at an approximate rate of 50k/s, which is around ~400Mb/s network I/O (monitored using Ganglia metrics).

 

 

from concurrent import futures
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)

publish_futures = []

for i in range(1000):
  # excluded code to lookup random payload and attributes

  publish_future = publisher.publish(
    topic_path, data.encode("utf-8"), 
    # message attributes below this line
    message_id=.., this_attribute=.., json=.., reverse=.., this_attribute_2=..)
  publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

 

 

We run this notebook as a Databricks job, parameterised so that it can either run continuously, publishing up to 50k/s, or generate a fixed number of messages and then stop, which is useful in testing how quickly a consumer can process a backlog of messages.

The metrics tab on the topic in the GCP console shows the published message count to be just over 50k/s, and the size of the messages, as expected, to be a fixed 8kb, with total throughput at ~400MB/s:

 

image21.png

image15.png

image3.png

 

 

Note that there is a lag of up to 3 minutes before the updated metrics are visible, as documented here. We are now ready to consume messages from the subscription.

Consuming from Pub/Sub

Google Pub/Sub vs Kafka Consumers

For streaming consumers, there are some key differences between Google Pub/Sub and Kafka - perhaps most important is that each message has to be acknowledged separately, otherwise the message will be redelivered.

 

Kafka

Google Pub/Sub

Ordering

Messages ordered within a partition

Messages are not ordered by default. They can be ordered within a topic if it is enabled.

Offset management

Each consumer maintains an offset, which indicates which records in the queue have already been processed.

Each message is acknowledged individually. If a message is not acknowledged, it will be redelivered after a specific time. Messages can also be re-sent even if acknowledged.

Replaying historical messages

Offset set to an earlier point in the queue, all messages after this point are then redelivered.

Messages can be marked as undelivered based on a timestamp, which will redeliver all messages after this timestamp.

Exactly-Once Semantics

Typically, streaming pipelines implement at-least-once semantics. Messages are guaranteed to be delivered to the target table or downstream pipeline, but in the event of a failure (such as a rebalance), there may be duplicate messages. At-least-once semantics require all downstream consumers to be idempotent - they will handle duplicates without causing issues with data integrity.

Exactly-once semantics also guarantees message delivery, but additionally guarantees that there are no duplicate messages. This is more difficult to achieve, but removes the requirement for downstream components to handle or filter out duplicates, reducing complexity. Structured streaming supports exactly-once semantics provided that the target sink also supports it.

Consuming with Python

Until recently, there was no easy way to consume messages into Databricks from Pub/Sub. Previously many users would consume using the Python API and then write out via a Dataframe - while this approach works, it is overly complex, and difficult to maintain and monitor.

A possible architecture for this approach is shown below. To ensure at-least-once semantics, we first write the output of the Python API to a Delta table, using one Notebook job. We then set up a streaming job to consume from the Delta table to acknowledge message delivery back to Pub/Sub. In this way, messages are only marked as delivered once they are written to the Delta table.

image5.png

Many duplicates are generated, because the messages aren’t acknowledged fast enough. We would likely then need an expensive deduplication process, which would add further cost and latency.

Architecture of the Structured Streaming Pub/Sub consumer

As mentioned above, Google Pub/Sub requires an acknowledgement for each individual message. If a message is not acknowledged within the Acknowledgement Deadline specified on subscription creation (see above), then the message will be redelivered to the consumer, which could result in duplicates.

To ensure these duplicates are not propagated downstream, the Structured Streaming connector is implemented as two parallel jobs:

Message Fetching job

Polls Pub/Sub for new messages, writing the message payload and metadata down to the checkpoint folder and writing the metadata to RocksDb. The message is not acknowledged back to Pub/Sub until it is successfully written to RocksDb and replicated to cloud.

In RocksDb the metadata is sharded by the messageId in order to be able to identify messages that have been seen before.

Microbatch Processing job

Polls RocksDb for new micro-batches of messages that can be written downstream.

image1.png

Consuming with Structured Streaming

Setting up the Pub/Sub Spark Structured Streaming consumer in Databricks is very straightforward. We simply write the stream output to a Delta table, with no transformations, so that we can check the exactly-once semantics - i.e. there are no missing or duplicate records. In practice, the stream could be used for near-real time analytics, as part of a medallion architecture, or pushed to other streaming pipelines.

Our cluster will be initially configured with 8 workers, each with 8 cores, as shown below:

image10.png

To read from the Pub/Sub stream, we use the familiar syntax spark.readStream, with format set to pubsub. The subscriptionId, topicId and projectId are mandatory options.

The account that is consuming from the subscription must be granted both Pub/Sub Subscriber and Pub/Sub Viewer access (Note: in a future release, the requirement for Pub/Sub Viewer will likely be removed). These permissions need to be either granted to the account (user account or service account) used to run the cluster, or a different service account can be used, by passing clientId, clientEmail, privateKey and privateKeyId as options to readStream. These parameters can be stored as secrets, or extracted from the JSON configuration file generated when the key is created:

 

 

project_id = <..>
topic_id = dbutils.widgets.get("topic_id")
subscription_id = dbutils.widgets.get("subscription_id")

# specify the service account credentials using databricks secrets
authOptions ={
  "clientId" -> dbutils.secrets.get("secret-scope", "clientId"),
  "clientEmail" -> dbutils.secrets.get("secret-scope", "clientEmail"),
  "privateKey" -> dbutils.secrets.get("secret-scope", "privateKey"),
  "privateKeyId" -> dbutils.secrets.get("secret-scope", "privateKeyId")
}

# read the service account credentials from a JSON key file
with open("<service account JSON key file>.json", 'r') as f:
  data = json.load(f)

authOptions = {
  "clientId": data['client_id'],
  "clientEmail": data['client_email'],
  "privateKey": data['private_key'],
  "privateKeyId": data['private_key_id']
}

df = (spark.readStream
  .format("pubsub")
  .option("subscriptionId", subscription_id) # required
  .option("topicId", topic_id) # required
  .option("projectId", project_id) # required
  .options(**authOptions) # required only if using a service principal
  .load()
)

 

 

The Dataframe returned from the Pub/Sub stream has the following schema:

messageId: string
payload: binary
attributes: string
publishTimestampInMillis: long

Before writing the table to Delta we decode the UTF-8 encoded string, and unpack the attributes column, which is returned as a JSON map, into a column of type MAP<STRING, STRING>, and:

 

 

import pyspark.sql.types as T
import pyspark.sql.functions as F

query = (df
  .withColumn("payload", F.decode(F.col("payload"), "UTF-8"))
  .withColumn("attributes", F.from_json(F.col("attributes"), T.MapType(T.StringType(), T.StringType())))
  .writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", checkpoint_path)
  .start(save_path)
)

 

 

So the final Dataframe in the Delta table has the following schema:

messageId: string
payload: string
attributes: map
        key: string
        value: string
publishTimestampInMillis: long

On running the notebook, the familiar stream metrics UI shows that the messages are being consumed and written down to Delta:

image24.png

The output dataset in Delta can be seen below:

image16.png

Monitoring message throughput

It is essential to monitor any streaming pipeline to ensure that it is keeping up with the rate of input messages, which could fluctuate over time, keeping any backlog to a minimum. Equally, it is important to ensure that compute resources are not under utilised when the rate of input messages is low.

There are several ways to monitor the performance of the Pub/Sub Structured Streaming connector.

 

 

from pyspark.sql.streaming import StreamingQueryListener

class PubSubListener(StreamingQueryListener):
  def onQueryStarted(self, event):
    pass

  def onQueryProgress(self, event):
    try:
      print(event.progress.sources[0].numInputRows)
      print(event.progress.sources[0].inputRowsPerSecond)
      print(event.progress.sources[0].processedRowsPerSecond)
      print(event.progress.sources[0].metrics)
    except Exception as e:
      print(f"Error occurred {e}")

  def onQueryTerminated(self, event):
    print(f"{event.id} got terminated!")

  def onQueryIdle(self, event):
    pass

listener = PubSubListener()
spark.streams.addListener(listener)

 

 

This API returns the same metrics that are displayed in the stream metrics UI above. It also returns Pub/Sub custom metrics, such as numRecordsReadyToProcess, however this only returns the count of messages available to be read at that point, and does not correlate with the number of unacknowledged messages.

  • Viewing the metrics tab on the subscription page in the GCP console, example graphs shown in the subsequent section.
  • Calling the GCP metrics API to pull the metrics for that subscription, example code shown below:

 

 

import datetime
import time

from google.cloud import monitoring_v3
from google.cloud.monitoring_v3 import query

client = monitoring_v3.MetricServiceClient()

while True and stream.isActive: # stream is the Spark stream reference
  time.sleep(10)

  results = query.Query(
    client,
    project_id,
    'pubsub.googleapis.com/subscription/num_undelivered_messages',
    end_time=datetime.datetime.now(),
    minutes=1,
  ).select_resources(subscription_id=subscription_id)

  undelivered_messages = ([point.value.int64_value for result in results for
    point in result.points] or [None])[0]

  if not undelivered_messages is None:
    print(f"{undelivered_messages} undelivered messages")

 

 

Configuring for maximum performance

We can see from the subscription metrics tab shown below that with the 50k/s publish message rate, the structured streaming consumer is only achieving half the required throughput of messages, given the current default parameters and cluster size. The oldest unacknowledged message age is continuing to grow:

image4.png

image20.png

To ensure the messages are being processed in a timely way, we need to increase throughput, ideally without increasing cluster size. We increase the numFetchPartitions parameter, which defaults to one partition per executor, to one partition per core (i.e. 64 as we have 64 cores).

image6.png

image7.png

The messages are now being consumed faster than they are being published, and the message age is beginning to decrease. But can we do better? Increasing numFetchPartitions to 128 doesn’t double the throughput, but it does improve performance to around 95k/s, which equates to over 700MB/s.

image17.png

We now have the option of reducing the cluster size down to two workers of n2-standard-16. We can see that this cluster configuration can’t quite maintain the 50k/s messages throughput, so it is better to retain at least three workers to allow it to catch up from any outages or spikes in the message volume.

image19.png

image22.png

In a final test, we reduce the message size, but increase the volume of inbound messages to 300k/s, to ensure that this increased message rate can still be supported by the Structured Streaming connector. We see in the topic metrics below, that the publish throughput is now around 250Mb/s:

image18.png

image12.png

image13.png

To process this volume of messages, we increase the cluster size to 20 workers. This configuration provides enough throughput to consume around 320k/s rate of messages, which will allow us to process any backlog of messages:

image14.png

Configuring for minimum latency

The configuration used above is optimized for throughput but not latency. To reduce the latency, we configure the following setting on the Structured Streaming consumer:

 

 

spark.readStream
  ...
  .option("maxFetchPeriod", "1s")

 

 

And on the Delta table stream writer:

 

 

  .trigger(processingTime='1 seconds')

 

 

Pub/Sub already provides a timestamp field indicating when the field was published, this is written to our Delta table so we use this to calculate the latency (in milliseconds), for our test dataset of 100k records:

image9.png

But can we do better? The Spark UI indicates multiple micro-batches are required to consume the 100k records, because the value of maxRecordsPerFetch is too small. So we increase this:

 

 

  .option("maxRecordsPerFetch", 10000)

 

 

This means all the records fit in a single iteration of the microbatch, reducing the latency down to below 10 seconds:

image2.png

Failure scenarios

Before testing various failure scenarios, we will consume from the same topic using a separate subscription, using the Python API consumer as described earlier. We know from this architecture that messages will only be acknowledged to Pub/Sub once they are in the Delta table, guaranteeing at-least-once semantics.image8.png

To test the Structured Streaming implements ‘exactly once’ semantics, we need to ensure the stream pipeline conforms to two criteria:

Criteria

Evaluation Approach

Passed

No duplicate messages are created in the target Delta table

Check for duplicate messages in the target table, testing the MessageId and the UUID columns separately.

Yes

No messages exist in the Pub/Sub subscription that were not also written to the Delta table

Check for messages that exist in the Delta table generated from the Python API that do not exist in the table generated from the Structured Streaming consumer, using the LEFT ANTI JOIN syntax.

Yes

We evaluate several failure scenarios during a running stream:

Failure Scenario

Impact

Increasing cluster size during pipeline running

No impact to latency. No duplicate or missing messages.

Reducing cluster size during pipeline running

Latency increases during downsize, and then may stay higher depending on throughput. No duplicate or missing messages.

Graceful shutdown of the Structured Streaming pipeline

No duplicate or missing messages.

Hard shutdown of the Structured Streaming pipeline

No duplicate or missing messages.

Removal or modification of the checkpoint folder

Stream fails and will not restart until the checkpoint folder is removed. Messages will need to be replayed from an earlier date and then deduplicated manually.

Of all the scenarios tested, only the final scenario caused duplicate or missing messages.

Conclusion

We have seen that the new Spark Structured Streaming Pub/Sub connector is a convenient, performant solution for reliably consuming messages from Google Pub/Sub:

  • We showed how it scaled on a small cluster to sustain an impressive throughput of 400MB/s, consisting of 50k/s relatively large (8kb) synthetic messages, persisting them to a Delta table.
  • We also saw that with the right configuration, latency can be reduced as low as 10 seconds from messages being published through to being available for querying in a Delta table.

This new capability will both enable existing GCP customers to unlock new use cases, and allow Spark users with streaming use cases to migrate to GCP.

4 Comments
aerofish
New Contributor III

Thanks for the detailed blog! I'm testing the similar scenario. However in the production, we have to add additional deduplication to achieve the exactly once from data sources until delta table. No data source publisher can ensure exactly once delivery. Therefore we apply the dropDuplicatesWithinWatermark with unique event ID, which is recommended in https://docs.gcp.databricks.com/en/structured-streaming/watermarks.html. According to my test, we do lose events from time to time as long as adding dropDuplicatesWithinWatermark. Without dropDuplicatesWithinWatermark, I never lose events.

@thewizard: Do you test such scenario? Or do you know the detailed design and implementation of dropDuplicatesWithinWatermark would cause issue? Thanks!

thewizard
Databricks Employee
Databricks Employee

I don't know enough about your configuration and use case to really answer this. There shouldn't be duplicates written by the pub/sub connector, as they are deduped in RocksDB. However they are deduped using the messageId field, which may be different to your "unique event ID". There is no way of the pub/sub connector deduping on something other than the messageId, so if there are dupliate records coming in to pub/sub, then you would stil need to use dropDuplicatesWithinWatermark to get rid of them.

aerofish
New Contributor III

Hi @thewizard , thanks for your quick reply. Yes, I have to do the deduplication based on my business unique ID. Because the message publisher (neither pub/sub, nor Databricks connector) could cause the duplication due to retry publishing. Therefore I'm relying on dropDuplicatesWithinWatermark to do deduplication, and this method cause message lost...

thewizard
Databricks Employee
Databricks Employee

Apologies for late reply, I was at DAIS. You should not receive a message to be dropped with dropDuplicatesWithinWatermark (except where it is a genuine duplicate, based on the specific keys). So I would raise a support ticket if the problem persists so that we can investigate.