cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

autoloader break on migration from community to trial premium with s3 mount

jhgorse
New Contributor III

in dbx community edition, the autoloader works using the s3 mount. s3 mount, autoloader:

dbutils.fs.mount(f"s3a://{access_key}:{encoded_secret_key}@{aws_bucket_name}", f"/mnt/{mount_name}
from pyspark.sql import SparkSession
from pyspark.sql.functions import from_json, col
 
source_directory = 'dbfs:/mnt/s3-mnt/logs/$aws/things/device/data'
destination_directory = "dbfs:/mnt/s3-mnt/data/davis/delta/data"
checkpoint_path =       "dbfs:/mnt/s3-mnt/data/davis/delta/data_checkpoint"
 
# switched to data_schema2 at s3 timestamp object 1682389110770
# added ac.Timestamp grab
schema = data_schema2
 
streaming_query = (spark.readStream
    .format("cloudFiles")
    .option("cloudFiles.format", "json")
    .option("cloudFiles.schemaEvolutionMode", "rescue")
    # .option("source", "s3://joe-open/")
#     .option("cloudFiles.schemaLocation", checkpoint_path)
    .schema(schema)
    .option("rescuedDataColumn", "_rescued_data")
    .load(source_directory)
 
    .writeStream
    .format("delta")
    .option("path", destination_directory)
    .option("checkpointLocation", checkpoint_path)
    .option("cloudFiles.schemaEvolutionMode", "True")
    .option("mergeSchema", "true")
 
    .trigger(availableNow=True)
    .start()
)
 
streaming_query.awaitTermination()

In premium trial, it fails with

---------------------------------------------------------------------------
StreamingQueryException                   Traceback (most recent call last)
File <command-3092456776679220>:38
     15 schema = data_schema2
     17 streaming_query = (spark.readStream
     18     .format("cloudFiles")
     19     .option("cloudFiles.format", "json")
   (...)
     35     .start()
     36 )
---> 38 streaming_query.awaitTermination()
 
File /databricks/spark/python/pyspark/sql/streaming/query.py:201, in StreamingQuery.awaitTermination(self, timeout)
    199     return self._jsq.awaitTermination(int(timeout * 1000))
    200 else:
--> 201     return self._jsq.awaitTermination()
 
File /databricks/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py:1322, in JavaMember.__call__(self, *args)
   1316 command = proto.CALL_COMMAND_NAME +\
   1317     self.command_header +\
   1318     args_command +\
   1319     proto.END_COMMAND_PART
   1321 answer = self.gateway_client.send_command(command)
-> 1322 return_value = get_return_value(
   1323     answer, self.gateway_client, self.target_id, self.name)
   1325 for temp_arg in temp_args:
   1326     if hasattr(temp_arg, "_detach"):
 
File /databricks/spark/python/pyspark/errors/exceptions/captured.py:168, in capture_sql_exception.<locals>.deco(*a, **kw)
    164 converted = convert_exception(e.java_exception)
    165 if not isinstance(converted, UnknownException):
    166     # Hide where the exception came from that shows a non-Pythonic
    167     # JVM exception message.
--> 168     raise converted from None
    169 else:
    170     raise
 
StreamingQueryException: [STREAM_FAILED] Query [id = ba24256e-c098-4c9c-9672-a96898104770, runId = b9037af2-98b8-4669-944f-7559adac1b57] terminated with exception: The bucket in the file event `{"backfill":{"bucket":"dbfsv1-files","key":"mnt/s3-mnt/logs/$aws/things/device/data/1682993996652","size":12304,"eventTime":1682993997000}}` is different from expected by the source: `[s3 bucket name]`.
...
NOTE: [s3 bucket name] is my scrubbing of the s3 bucket name.

What does it mean? How do I resume autoloading from community to paid dbx?

1 ACCEPTED SOLUTION

Accepted Solutions

jhgorse
New Contributor III

Solved with:

spark.conf.set("spark.databricks.cloudFiles.checkSourceChanged", False)

Nothing else seemed to work. Including the path rename Autoloader option.

View solution in original post

2 REPLIES 2

jhgorse
New Contributor III

Solved with:

spark.conf.set("spark.databricks.cloudFiles.checkSourceChanged", False)

Nothing else seemed to work. Including the path rename Autoloader option.

Anonymous
Not applicable

Hi @Joe Gorse​ 

Thank you for posting your question in our community! We are happy to assist you.

To help us provide you with the most accurate information, could you please take a moment to review the responses and select the one that best answers your question?

This will also help other community members who may have similar questions in the future. Thank you for your participation and let us know if you need any further assistance! 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group