cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

AutoLoader File notification mode Configuration with AWS

rt-slowth
Contributor

 

 

 

from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import DataFrame, Column
from pyspark.sql.types import Row
import dlt

S3_PATH = 's3://datalake-lab/XXXXX/'
S3_SCHEMA = 's3://datalake-lab/XXXXX/schemas/'

raw_df = (
    spark.readStream.format("cloudFiles")
    .option("cloudFiles.format", "parquet")
    .option("cloudFiles.schemaLocation", f"{S3_SCHEMA}")
    .option("cloudFiles.useNotifications", "true")
    .option("cloudFiles.region", "ap-northeast-1")
    .option(
        "cloudFiles.queueUrl",
        "https://sqs.ap-northeast-1.amazonaws.com/372383439276/databricks-auto-ingest-test",
    )
    .load(f"{S3_PATH}")  
)


display(raw_df) 

 

 

 

 

 

[All Policy]
I am applying the following policy to the IAM that I am using as an instance profile for databricks.

-  Instance Profile's Policies

 

# Policy 1
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:*"
            ],
            "Resource": [
                "arn:aws:s3::datalake-lab"
            ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:GetObject",
                "s3:DeleteObject",
                "s3:PutObjectAcl"
            ],
            "Resource": [
                "arn:aws:s3:::datalake-lab/*"
            ]
        }
    ]
}

# Policy 2
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "DatabricksAutoLoaderSetup",
            "Effect": "Allow",
            "Action": [
                "s3:GetBucketNotification",
                "s3:PutBucketNotification",
                "sns:ListSubscriptionsByTopic",
                "sns:GetTopicAttributes",
                "sns:SetTopicAttributes",
                "sns:CreateTopic",
                "sns:TagResource",
                "sns:Publish",
                "sns:Subscribe",
                "sqs:CreateQueue",
                "sqs:DeleteMessage",
                "sqs:ReceiveMessage",
                "sqs:SendMessage",
                "sqs:GetQueueUrl",
                "sqs:GetQueueAttributes",
                "sqs:SetQueueAttributes",
                "sqs:TagQueue",
                "sqs:ChangeMessageVisibility"
            ],
            "Resource": [
                "arn:aws:s3:::datalake-lab",
                "arn:aws:sqs:<reegion>:<user>:databricks-auto-ingest-test"
            ]
        },
        {
            "Sid": "DatabricksAutoLoaderList",
            "Effect": "Allow",
            "Action": [
                "sqs:ListQueues",
                "sqs:ListQueueTags",
                "sns:ListTopics"
            ],
            "Resource": "*"
        },
        {
            "Sid": "DatabricksAutoLoaderTeardown",
            "Effect": "Allow",
            "Action": [
                "sns:Unsubscribe",
                "sns:DeleteTopic",
                "sqs:DeleteQueue"
            ],
            "Resource": [
                "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
            ]
        }
    ]
}

 

 

 - SQS Policy
 

 

{
  "Version": "2012-10-17",
  "Id": "PolicyID1234567890",
  "Statement": [
    {
      "Sid": "AllowS3BucketNotifications",
      "Effect": "Allow",
      "Principal": {
        "Service": "s3.amazonaws.com"
      },
      "Action": "sqs:SendMessage",
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test",
      "Condition": {
        "ArnLike": {
          "aws:SourceArn": "arn:aws:s3:::ktown4u-datalake-lab"
        }
      }
    },
    {
      "Sid": "AllowDatabricksRoleAccessToSQSPart1",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Instance Profile ARN>"
      },
      "Action": [
        "sqs:CreateQueue",
        "sqs:DeleteMessage",
        "sqs:ReceiveMessage",
        "sqs:SendMessage",
        "sqs:GetQueueUrl",
        "sqs:GetQueueAttributes",
        "sqs:SetQueueAttributes"
      ],
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
    },
    {
      "Sid": "AllowDatabricksRoleAccessToSQSPart2",
      "Effect": "Allow",
      "Principal": {
        "AWS": "<Instance Profile ARN>"
      },
      "Action": [
        "sqs:TagQueue",
        "sqs:ChangeMessageVisibility"
      ],
      "Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
    }
  ]
}

 

 
 

 

I encountered the following error.

 

> com.amazonaws.services.sqs.model.AmazonSQSException: User: anonymous is not authorized to perform: sqs:receivemessage on resource: arn:aws:sqs:<region-name>:<user>:databricks-auto-ingest-test because no resource-based policy allows the sqs:receivemessage action (Service: AmazonSQS; Status Code: 403; Error Code: AccessDenied;: null)
 
 
5 REPLIES 5

Debayan
Esteemed Contributor III
Esteemed Contributor III

Hi, Looks like this is a resource-based policy permission error. Could you please check if all the actions/permissions has been added properly in the resource-based policy? 

Hi, @Debayan 

How should I check it?

djhs
New Contributor III

Was this resolved? I run into the same issue

@djhs 
Yes, I solved it, I was trying to run File notification Mode on a Shared Cluster and that's what caused the problem.

@rt-slowth , Could you please add little bit more details on how did you fix it ? I have a similar issue, What  did you change to avoid using shared cluster ? 

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.