- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-02-2023 06:12 AM
in dbx community edition, the autoloader works using the s3 mount. s3 mount, autoloader:
dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
source_directory = 'dbfs:/mnt/s3-mnt/logs/$aws/things/device/data'
destination_directory = "dbfs:/mnt/s3-mnt/data/davis/delta/data"
checkpoint_path = "dbfs:/mnt/s3-mnt/data/davis/delta/data_checkpoint"
# switched to data_schema2 at s3 timestamp object 1682389110770
# added ac.Timestamp grab
schema = data_schema2
streaming_query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.schemaEvolutionMode", "rescue")
# .option("source", "s3://joe-open/")
# .option("cloudFiles.schemaLocation", checkpoint_path)
.schema(schema)
.option("rescuedDataColumn", "_rescued_data")
.load(source_directory)
.writeStream
.format("delta")
.option("path", destination_directory)
.option("checkpointLocation", checkpoint_path)
.option("cloudFiles.schemaEvolutionMode", "True")
.option("mergeSchema", "true")
.trigger(availableNow=True)
.start()
)
streaming_query.awaitTermination()
In premium trial, it fails with
---------------------------------------------------------------------------
StreamingQueryException Traceback (most recent call last)
File <command-3092456776679220>:38
15 schema = data_schema2
17 streaming_query = (spark.readStream
18 .format("cloudFiles")
19 .option("cloudFiles.format", "json")
(...)
35 .start()
36 )
---> 38 streaming_query.awaitTermination()
File /databricks/spark/python/pyspark/sql/streaming/query.py:201, in StreamingQuery.awaitTermination(self, timeout)
199 return self._jsq.awaitTermination(int(timeout * 1000))
200 else:
--> 201 return self._jsq.awaitTermination()
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
1316 command = proto.CALL_COMMAND_NAME +\
1317 self.command_header +\
1318 args_command +\
1319 proto.END_COMMAND_PART
1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
1323 answer, self.gateway_client, self.target_id, self.name)
1325 for temp_arg in temp_args:
1326 if hasattr(temp_arg, "_detach"):
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:168, in capture_sql_exception.<locals>.deco(*a, **kw)
164 converted = convert_exception(e.java_exception)
165 if not isinstance(converted, UnknownException):
166 # Hide where the exception came from that shows a non-Pythonic
167 # JVM exception message.
--> 168 raise converted from None
169 else:
170 raise
StreamingQueryException: [STREAM_FAILED] Query [id = ba24256e-c098-4c9c-9672-a96898104770, runId = b9037af2-98b8-4669-944f-7559adac1b57] terminated with exception: The bucket in the file event `{"backfill":{"bucket":"dbfsv1-files","key":"mnt/s3-mnt/logs/$aws/things/device/data/1682993996652","size":12304,"eventTime":1682993997000}}` is different from expected by the source: `[s3 bucket name]`.
...
NOTE: [s3 bucket name] is my scrubbing of the s3 bucket name.
What does it mean? How do I resume autoloading from community to paid dbx?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-02-2023 07:49 AM
Solved with:
spark.conf.set("spark.databricks.cloudFiles.checkSourceChanged", False)
Nothing else seemed to work. Including the path rename Autoloader option.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-02-2023 07:49 AM
Solved with:
spark.conf.set("spark.databricks.cloudFiles.checkSourceChanged", False)
Nothing else seemed to work. Including the path rename Autoloader option.
![](/skins/images/F150478535D6FB5A5FF0311D4528FC89/responsive_peak/images/icon_anonymous_profile.png)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
05-18-2023 11:34 PM
Hi @Joe Gorse
Thank you for posting your question in our community! We are happy to assist you.
To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?
This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance!
![](/skins/images/582998B45490C7019731A5B3A872C751/responsive_peak/images/icon_anonymous_message.png)
![](/skins/images/582998B45490C7019731A5B3A872C751/responsive_peak/images/icon_anonymous_message.png)