cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Problem with Autoloader, S3, and wildcard

Raymond_Garcia
Contributor II

Hello, I have an autoloader code and it is pretty standard, we have this variable file path that points to an S3 bucket. example #2 executed successfully and example 1 throws an exception.

it seems like source 1 always throws an exception whereas source 2 works but it throws an exception when I used a more generic path like ???-??/??-??

If anybody has a clue how to solve this issue it will be helpful, thanks in advance!

example 1: val file_path = "/mnt/output/raw/source1/????-??/??-??/*.e.ndjson"

example 2: val file_path = "/mnt/output/raw/source2/2022-11/14-??/*.e.ndjson"

com.amazonaws.services.s3.model.AmazonS3Exception: Unable to validate the following destination configurations (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument;

or

Configuration is ambiguously defined. Cannot have overlapping suffixes in two rules if the prefixes are overlapping for the same event type. (Service: Amazon S3; Status Code: 400; Error Code: InvalidArgument;

val reader = spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "text")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .option("cloudFiles.useNotifications", true)
  .load(file_path)
  .selectExpr(s"value")
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("topic", "test_topic_3")
  .option("checkpointLocation", checkpoint_path)
  .trigger(Trigger.AvailableNow)
  .start()

1 ACCEPTED SOLUTION

Accepted Solutions

Raymond_Garcia
Contributor II

The error was more related to a lot of stuff on the AWS side, so we deleted and cleared the SQS and SNS. we also used the CloudFilesAWSResourceManager

val manager = CloudFilesAWSResourceManager
    .newManager
    .option("path", filePath)
    .create()
   manager.setUpNotificationServices(notificationServices)

View solution in original post

1 REPLY 1

Raymond_Garcia
Contributor II

The error was more related to a lot of stuff on the AWS side, so we deleted and cleared the SQS and SNS. we also used the CloudFilesAWSResourceManager

val manager = CloudFilesAWSResourceManager
    .newManager
    .option("path", filePath)
    .create()
   manager.setUpNotificationServices(notificationServices)

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group