in dbx community edition, the autoloader works using the s3 mount. s3 mount, autoloader:
dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
source_directory = 'dbfs:/mnt/s3-mnt/logs/$aws/things/device/data'
destination_directory = "dbfs:/mnt/s3-mnt/data/davis/delta/data"
checkpoint_path = "dbfs:/mnt/s3-mnt/data/davis/delta/data_checkpoint"
# switched to data_schema2 at s3 timestamp object 1682389110770
# added ac.Timestamp grab
schema = data_schema2
streaming_query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaEvolutionMode", "rescue")
# .option("source", "s3://joe-open/")
# .option("cloudFiles.schemaLocation", checkpoint_path)
.schema(schema)
.option("rescuedDataColumn", "_rescued_data")
.load(source_directory)
.writeStream
.format("delta")
.option("path", destination_directory)
.option("checkpointLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "True")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.start()
)
streaming_query.awaitTermination()
In premium trial, it fails with
---------------------------------------------------------------------------
StreamingQueryException Traceback (most recent call last)
File <command-3092456776679220>:38
15 schema = data_schema2
17 streaming_query = (spark.readStream
18 .format("cloudFiles")
19 .option("cloudFiles.format", "json")
(...)
35 .start()
36 )
---> 38 streaming_query.awaitTermination()
File /databricks/spark/python/pyspark/sql/streaming/query.py:201, in StreamingQuery.awaitTermination(self, timeout)
199 return self._jsq.awaitTermination(int(timeout * 1000))
200 else:
--> 201 return self._jsq.awaitTermination()
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:168, in capture_sql_exception.<locals>.deco(*a, **kw)
164 converted = convert_exception(e.java_exception)
165 if not isinstance(converted, UnknownException):
166 # Hide where the exception came from that shows a non-Pythonic
167 # JVM exception message.
--> 168 raise converted from None
169 else:
170 raise
StreamingQueryException: [STREAM_FAILED] Query [id = ba24256e-c098-4c9c-9672-a96898104770, runId = b9037af2-98b8-4669-944f-7559adac1b57] terminated with exception: The bucket in the file event `{"backfill":{"bucket":"dbfsv1-files","key":"mnt/s3-mnt/logs/$aws/things/device/data/1682993996652","size":12304,"eventTime":1682993997000}}` is different from expected by the source: `[s3 bucket name]`.
...
NOTE: [s3 bucket name] is my scrubbing of the s3 bucket name.
What does it mean? How do I resume autoloading from community to paid dbx?