I'm trying to perform a simple example using structured streaming on a directory created as a Volume. The use case is purely educational; I am investigating various forms of triggers. Basic info:
Catalog: "dev_catalog"
Schema: "stream"
Volume: "streaming_basics"
custom variable "source" with value "/Volumes/dev_catalog/stream/streaming_basics/"
When running a cell calling the writeStream() method, passing the path I would like to use for checkpointing, I find that Databricks inserts the default schema into the path instead of the desired "stream" schema. The cell contains the following code:
print(source)
writestream = (df.writeStream
.option('checkpointLocation', f'{source}AppendCheckpoint')
.outputMode('append')
.queryName('DefaultTrigger')
.toTable('stream.AppendTable')
)
And the console outputs the following (value of variable 'source' and error message):
/Volumes/dev_catalog/stream/streaming_basics/
[PATH_NOT_FOUND] Path does not exist: /Volumes/dev_catalog/default/streaming_basics/. SQLSTATE: 42K03
This is baffling to me.
Additional info: Earlier in the notebook, I am running the following two cells:
%sql
DROP DATABASE IF EXISTS stream CASCADE;
CREATE DATABASE IF NOT EXISTS stream;
%sql
USE SCHEMA stream;
The docs say that "DATABASE" is an alias for "SCHEMA", so the two can be used interchangeably. But even so, this shouldn't be an issue since the schema "stream" is specified explicitly in my 'source' variable. From the documentation on Volumes, any directory or file in a Volume is specified using:
/Volumes/<catalog_identifier>/<schema_identifier>/<volume_identifier>/<path>/<file_name>
This aligns with the path I've specified in the "source" variable.
More additional info: I've previously run another structured streaming educational example in a different notebook on the same cluster in the "dev_catalog.default" schema, with a volume of the same name. The checkpointLocation which the writeStream method apparently uses aligns with the location I specified in this previous example. In between sessions, I've stopped the cluster and started it again. I include this information because the only explanation I can come up with myself based on the error message is that the cluster cached information from the previous streaming job (in the default schema) and is now using the cached information - either the variable "source" or the call to "writeStream()" itself.
Any explanation for this behaviour would be much appreciated, as well as tips on how to overcome it.