A new PySpark Custom Data Sources API was introduced at DAIS 2024 which simplifies the integration with custom data sources in Apache Spark. Imagine seamlessly streaming incremental data from any API right into Delta Tables via Structured Streaming. The current documentation covers the basics, but it misses out on one crucial feature i.e. progress tracking. As of now, if you pause and resume a streaming job using custom data sources, it always restarts from scratch. This blog presents how to implement checkpoint-like capabilities to keep track of the progress for custom data sources and includes an example of reading data from a Public API in structured streaming and incrementally ingesting to Delta Table. The code in this article can be found in my GitHub repo.
Presently, Spark has native capability to integrate with a number of data sources. Broadly speaking, it can connect to messages buses (e.g. Kafka, Azure EventHub, Amazon Kinesis) and cloud storage (e.g. Amazon S3, Azure Data Lake Storage Gen2) and supports a number of formats (e.g. delimited, JSON, XML and Parquet).
To date, integration with other types of sources (e.g. APIs) required bespoke development in Python. Also, the capability to define a custom data source was only present in Scala/Java which was limiting for the Python community. This all changed at DAIS 2024 when the new Custom Data Source API in Apache Spark was introduced.
The new Custom Data Source API enables reading from custom data sources and writing to custom data sinks using Python. Developers can use PySpark to define custom, reusable connections to data systems and implement additional functionality. On top of that, it has the capability to read and write from such sources in streaming form. If you think about it from the perspective of a medallion architecture, this is quite a powerful capability as one can incrementally ingest data from such custom data sources in bronze tables while enabling the capability to build and process downstream tables incrementally as well.
The current documentation covers the basics, including a few examples on how to develop streaming custom data sources. However, based on the examples provided, you don't get the checkpointing-like capability. If you pause and resume streaming jobs ingesting data from custom data sources, it doesn't keep track of the progress and it will always start from scratch. On the other hand, when you use AutoLoader and configure checkpoint location, it performs progress tracking and ensures exactly-once guarantees. AutoLoader only processes new files and ignores the ones that have already been processed. Thus, streaming without checkpointing or progress tracking is not that useful as the main premise of streaming data is to ingest new data incrementally.
The solution presented in this article overcomes such limitations, enabling checkpointing-like capability to achieve progress tracking and exactly-once guarantees. When you use this approach your streaming jobs will resume reading from the last point onwards, instead of reading again from the beginning.
In this article, I am going to use a publicly available API which provides dummy data representing users comments. In each microbatch, we will read 10 records from the API and we will design the solution so that it is configurable.
Raw data from the API looks like this:
[
{
"postId": 1,
"id": 1,
"name": "id labore ex et quam laborum",
"email": "Eliseo@gardner.biz",
"body": "laudantium enim quasi est quidem magnam voluptate ipsam eos\ntempora quo necessitatibus\ndolor quam autem quasi\nreiciendis et nam sapiente accusantium"
},
{
"postId": 1,
"id": 2,
"name": "quo vero reiciendis velit similique earum",
"email": "Jayne_Kuhic@sydney.com",
"body": "est natus enim nihil est dolore omnis voluptatem numquam\net omnis occaecati quod ullam at\nvoluptatem error expedita pariatur\nnihil sint nostrum voluptatem reiciendis et"
},
{
"postId": 1,
"id": 3,
"name": "odio adipisci rerum aut animi",
"email": "Nikita@garfield.biz",
"body": "quia molestiae reprehenderit quasi aspernatur\naut expedita occaecati aliquam eveniet laudantium\nomnis quibusdam delectus saepe quia accusamus maiores nam est\ncum et ducimus et vero voluptates excepturi deleniti ratione"
}
]
After processing it, the schema of the data when stored as Delta Table will be like this:
id int, name string, email string, body string
The main premise of using PySpark custom data source API, for reading streaming data, consists of subclassing the following two classes:
This consists of defining the following methods:
For our example of defining custom data source against the comments API, it will look like this:
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, InputPartition
class CommentsAPIStreamDataSource(DataSource):
"""
An example data source for streaming data from a public API containing users' comments.
"""
@classmethod
def name(cls):
return "comments_api"
def schema(self):
return "id int, name string, email string, body string"
def streamReader(self, schema: StructType):
return CommentsAPIStreamReader(schema, self.options)
This is where the actual logic of reading data from a custom source and handling checkpointing like capability is handled. A brief description of the key methods is as follows:
Python developers will know about it already that it's the constructor of the class. In this example, we are passing schema and options as parameters. options is a dictionary that represents the options passed during spark.readStream invocation. For instance, if we specify api_url in the option:
spark.readStream.format("comments_api").option("api_url","https://jsonplaceholder.typicode.com/comments")
then it can be retrieved in the class's constructor via option.get("api_url")
In the __init__ method, we initialize class attributes that are then used by subsequent methods (e.g. progress_path, rows_per_batch, token etc). Also, we invoke _load_progress method which loads progress related metadata for our custom checkpointing like feature. More on that later.
This function returns the initial offset. When the custom data source is used for the first time in spark.readStream, this function gets invoked and it returns the starting offset which indicates from where to read the data. For our purpose, we return zero as the initial offset to indicate that we want to start reading from the first comment from the API. If the streaming query is re-run, this method isn't invoked.
It returns the latest offset that the current microbatch will read to. For instance, if we want to read 10 records from a source, that logic will be implemented here by incrementing the self.current attribute accordingly.
This is invoked when the query has finished reading data till the end-offsets. As an example, if we have configured the custom data source to read 10 records in each micro-batch, then after reading 10 records in micro-batch, this method will be invoked at the end of that micro-batch to store the progress. This is where the progress tracking capability is implemented to ensure that streaming is resumed from the last point it left off.
This function is where the actual logic of reading data from the source is implemented. For an API based source, the code will contain making a GET REST call to the API endpoint, along with the parameters, to get a chunk of data. After retrieving data, the data is parsed as a tuple, in accordance with the schema defined in the pyspark.sql.datasource.DataSource subclass' schema method, and is then yielded back.
The next two methods as follows are custom and are intended to implement the progress tracking capability.
This method is responsible for saving progress of the streaming data. As mentioned before that even though Structured Streaming already has a checkpointing capability for fault-tolerance and resumption, but it doesn't work when streaming to/from custom data sources. Custom data sources require a custom implementation of keeping track of the progress.
Though there can be various ways it can be implemented, I have resorted to using UC Volumes. The approach involves using UC Volumes REST endpoints to store progress metadata as a simple JSON file. Specifically, when the _save_progress is invoked, it stores and overwrites a JSON file with the following payload:
{"current": self.current}
when self.current contains the offset that the data has been read up to. For instance, on running structured streaming job and letting it run for two micro-batches and then stopping it, the JSON will look like this:
{"current": 20}
meaning it would have read/processed 20 records from the custom source in that run. And the next time the streaming job is resumed, it will start reading 21st records and onwards.
One may argue that why I am interacting with UC volumes via REST API? The primary reason is that if you intend to interact with UC Volumes via Python file libraries, you will run into permission error.
As the name states, this method loads progress. It reads the JSON file stored via the _save_progress method to determine from where to resume reading data from the custom source. Similar to _save_progress, it relies on REST API call to UC volume to read the JSON file.
These are the key methods and are likely the ones you will need to modify to customize the solution for your requirements. The other methods are required for the internal working of the class and hence are not discussed here.
In the end, the class looks like this:
class CommentsAPIStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.api_url = options.get("api_url")
self.progress_path = options.get("progress_path")
self.rows_per_batch = options.get("rows_per_batch",10)
self.url = f"https://{workspace_url}/api/2.0/fs/files{self.progress_path}progress.json"
self.token = token
self._load_progress()
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def _load_progress(self):
headers = {
"Authorization": f"Bearer {self.token}",
}
response = requests.get(self.url, headers=headers)
current = response.json().get('current',0)
self.current = current
def _save_progress(self):
url = f"{self.url}?overwrite=true"
headers = {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
data = json.dumps({"current": self.current})
response = requests.put(url, headers=headers, data=data)
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += self.rows_per_batch
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
self._save_progress()
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
params = {
"_start": start,
"_limit": self.rows_per_batch
}
response = requests.get(self.api_url, params=params)
for r in response.json():
yield (r['id'],r['name'],r['email'], r['body'])
Once you have defined your classes, the next step is to register that custom data source so that it can be used in the Structured Streaming job. It can be done via:
spark.dataSource.register(CommentsAPIStreamDataSource)
Once registered, it can be used in Structured Streaming jobs as follows:
(spark.readStream.
.format("comments_api")
.option("api_url", "https://jsonplaceholder.typicode.com/comments")
.option("progress_path", "/Volumes/catalog/schema/volume/custom_data_sources/progress/")
.load()
.select("*", F.current_timestamp().alias("current_ts"))
.writeStream
.option("checkpointLocation",
"/Volumes/catalog/schema/volume/custom_data_sources/checkpoint/")
.outputMode("append")
.toTable("catalog.schema.table")
)
In the example above:
On running for the first time, some records were ingested into the delta table:
and then after stopping the query and running it again after about 5 minutes, we can see that it has resumed and ingested more records to the table from the last point on; instead of reading it from the beginning again:
Thus in conclusion, this article highlights the approach and possibilities of using the new Pyspark Custom Data Source API to read data from any data source out there in streaming form (assuming that the data source has a notion of providing data in increments e.g. sequenced by some id or timestamp). Also, the article demonstrates how to keep track of the progress while reading streaming data from such custom sources as the native checkpointing capability in structured streaming doesn't work in this context.
You must be a registered user to add a comment. If you've already registered, sign in. Otherwise, register and sign in.