AutoLoader File notification mode Configuration with AWS
Options
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-10-2024 06:33 PM
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import DataFrame, Column
from pyspark.sql.types import Row
import dlt
S3_PATH = 's3://datalake-lab/XXXXX/'
S3_SCHEMA = 's3://datalake-lab/XXXXX/schemas/'
raw_df = (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", f"{S3_SCHEMA}")
.option("cloudFiles.useNotifications", "true")
.option("cloudFiles.region", "ap-northeast-1")
.option(
"cloudFiles.queueUrl",
"https://sqs.ap-northeast-1.amazonaws.com/372383439276/databricks-auto-ingest-test",
)
.load(f"{S3_PATH}")
)
display(raw_df)
[All Policy]
I am applying the following policy to the IAM that I am using as an instance profile for databricks.
- Instance Profile's Policies
# Policy 1
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:*"
],
"Resource": [
"arn:aws:s3::datalake-lab"
]
},
{
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:GetObject",
"s3:DeleteObject",
"s3:PutObjectAcl"
],
"Resource": [
"arn:aws:s3:::datalake-lab/*"
]
}
]
}
# Policy 2
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "DatabricksAutoLoaderSetup",
"Effect": "Allow",
"Action": [
"s3:GetBucketNotification",
"s3:PutBucketNotification",
"sns:ListSubscriptionsByTopic",
"sns:GetTopicAttributes",
"sns:SetTopicAttributes",
"sns:CreateTopic",
"sns:TagResource",
"sns:Publish",
"sns:Subscribe",
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes",
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": [
"arn:aws:s3:::datalake-lab",
"arn:aws:sqs:<reegion>:<user>:databricks-auto-ingest-test"
]
},
{
"Sid": "DatabricksAutoLoaderList",
"Effect": "Allow",
"Action": [
"sqs:ListQueues",
"sqs:ListQueueTags",
"sns:ListTopics"
],
"Resource": "*"
},
{
"Sid": "DatabricksAutoLoaderTeardown",
"Effect": "Allow",
"Action": [
"sns:Unsubscribe",
"sns:DeleteTopic",
"sqs:DeleteQueue"
],
"Resource": [
"arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
]
}
]
}
- SQS Policy
{
"Version": "2012-10-17",
"Id": "PolicyID1234567890",
"Statement": [
{
"Sid": "AllowS3BucketNotifications",
"Effect": "Allow",
"Principal": {
"Service": "s3.amazonaws.com"
},
"Action": "sqs:SendMessage",
"Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test",
"Condition": {
"ArnLike": {
"aws:SourceArn": "arn:aws:s3:::ktown4u-datalake-lab"
}
}
},
{
"Sid": "AllowDatabricksRoleAccessToSQSPart1",
"Effect": "Allow",
"Principal": {
"AWS": "<Instance Profile ARN>"
},
"Action": [
"sqs:CreateQueue",
"sqs:DeleteMessage",
"sqs:ReceiveMessage",
"sqs:SendMessage",
"sqs:GetQueueUrl",
"sqs:GetQueueAttributes",
"sqs:SetQueueAttributes"
],
"Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
},
{
"Sid": "AllowDatabricksRoleAccessToSQSPart2",
"Effect": "Allow",
"Principal": {
"AWS": "<Instance Profile ARN>"
},
"Action": [
"sqs:TagQueue",
"sqs:ChangeMessageVisibility"
],
"Resource": "arn:aws:sqs:<region>:<user>:databricks-auto-ingest-test"
}
]
}
I encountered the following error.
> com.amazonaws.services.sqs.model.AmazonSQSException: User: anonymous is not authorized to perform: sqs:receivemessage on resource: arn:aws:sqs:<region-name>:<user>:databricks-auto-ingest-test because no resource-based policy allows the sqs:receivemessage action (Service: AmazonSQS; Status Code: 403; Error Code: AccessDenied;: null)
The documentation I referenced is this : https://docs.databricks.com/en/ingestion/auto-loader/file-notification-mode.html
Labels:
- Labels:
-
Workflows