cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoLoader File notification mode Configuration with AWS

rt-slowth
Contributor

 

 

 

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import DataFrame, Column
from pyspark.sql.types import Row
import dlt

S3_PATH = 's3://datalake-lab/XXXXX/'
S3_SCHEMA = 's3://datalake-lab/XXXXX/schemas/'

raw_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{S3_SCHEMA}")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.region", "ap-northeast-1")
    .option(
        "cloudFiles.queueUrl",
        "https://sqs.ap-northeast-1.amazonaws.com/372383439276/databricks-auto-ingest-test",
    )
    .load(f"{S3_PATH}")  
)


display(raw_df) 

 

 

 

 

 

[All Policy]
I am applying the following policy to the IAM that I am using as an instance profile for databricks.

-  Instance Profile's Policies

 

# Policy 1
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3::datalake-lab"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::datalake-lab/*"
            ]
        }
    ]
}

# Policy 2
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DatabricksAutoLoaderSetup",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketNotification",
                "s3:PutBucketNotification",
                "sns:ListSubscriptionsByTopic",
                "sns:GetTopicAttributes",
                "sns:SetTopicAttributes",
                "sns:CreateTopic",
                "sns:TagResource",
                "sns:Publish",
                "sns:Subscribe",
                "sqs:CreateQueue",
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:GetQueueAttributes",
                "sqs:SetQueueAttributes",
                "sqs:TagQueue",
                "sqs:ChangeMessageVisibility"
            ],
            "Resource": [
                "arn:aws:s3:::datalake-lab",
                "arn:aws:sqs:<reegion>:<user>:databricks-auto-ingest-test"
            ]
        },
        {
            "Sid": "DatabricksAutoLoaderList",
            "Effect": "Allow",
            "Action": [
                "sqs:ListQueues",
                "sqs:ListQueueTags",
                "sns:ListTopics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DatabricksAutoLoaderTeardown",
            "Effect": "Allow",
            "Action": [
                "sns:Unsubscribe",
                "sns:DeleteTopic",
                "sqs:DeleteQueue"
            ],
            "Resource": [
                "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
            ]
        }
    ]
}

 

 

 - SQS Policy
 

 

{
  "Version": "2012-10-17",
  "Id": "PolicyID1234567890",
  "Statement": [
    {
      "Sid": "AllowS3BucketNotifications",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:::ktown4u-datalake-lab"
        }
      }
    },
    {
      "Sid": "AllowDatabricksRoleAccessToSQSPart1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Instance Profile ARN>"
      },
      "Action": [
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
    },
    {
      "Sid": "AllowDatabricksRoleAccessToSQSPart2",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Instance Profile ARN>"
      },
      "Action": [
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
    }
  ]
}

 

 
 

 

I encountered the following error.

 

> com.amazonaws.services.sqs.model.AmazonSQSException: User: anonymous is not authorized to perform: sqs:receivemessage on resource: arn:aws:sqs:<region-name>:<user>:databricks-auto-ingest-test because no resource-based policy allows the sqs:receivemessage action (Service: AmazonSQS; Status Code: 403; Error Code: AccessDenied;: null)
 
 
5 REPLIES 5

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, Looks like this is a resource-based policy permission error. Could you please check if all the actions/permissions has been added properly in the resource-based policy? 

Hi, @Debayan 

How should I check it?

djhs
New Contributor III

Was this resolved? I run into the same issue

@djhs 
Yes, I solved it, I was trying to run File notification Mode on a Shared Cluster and that's what caused the problem.

@rt-slowth , Could you please add little bit more details on how did you fix it ? I have a similar issue, What  did you change to avoid using shared cluster ? 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group