cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
Technical Blog
Explore in-depth articles, tutorials, and insights on data analytics and machine learning in the Databricks Technical Blog. Stay updated on industry trends, best practices, and advanced techniques.
cancel
Showing results forĀ 
Search instead forĀ 
Did you mean:Ā 
MohanaBasak
Databricks Employee
Databricks Employee

Introduction

This is a step-by-step guide to set up an AWS cross-account Databricks Autoloader connection in the File Notification mode. This will allow you to automatically load data from an S3 bucket in one AWS account (Account A) into a Databricks workspace in another AWS account (Account B).

MohanaBasak_1-1715182407285.png

Databricks Autoloader can either automatically set up SNS and SQS, or we can manually create the resources and then use them in the Autoloader. In either case, we will need an instance profile in Account B to access the SNS and SQS in Account A.

Understanding Key Services:

Before proceeding, let's clarify the purpose of the relevant services in AWS.

SNS (Simple Notification Service)
SNS is a fully managed pub/sub messaging service that enables seamless message delivery from publishers to subscribers. Autoloader file notification uses an SNS to get notifications whenever a file lands in S3.

SQS (Simple Queue Service)
SQS is a fully managed message queuing service that offers scalable, reliable, and distributed message queues. Autoloader uses an SQS to durably store messages from SNS. When the autoloader stream is started, it processes messages from the SQS to identify the new files.

Role-Based Access to Buckets
To access S3 buckets in another AWS account, you need to define IAM roles with policies that grant the necessary permissions. These roles establish trust relationships and ensure secure access to resources across accounts.

Trust Relationship
A trust relationship in AWS IAM defines which entities are trusted to assume a particular IAM role. When setting up cross-account access, trust relationships determine which accounts or entities can assume roles in other accounts.

Bucket Policy
A bucket policy in AWS S3 sets permissions for objects within a bucket, controlling access at the bucket and object level. It's written in JSON format and specifies who can access the bucket and what actions they can perform.

Create S3 bucket and cross-account instance profile

Now that we understand the relevant AWS services, we can get started setting up a cross-account Autoloader connection. 

  1. Create an S3 bucket in Account A. An example name could be acc-a-autol-input.
  2. Create an instance profile in Account B (refer steps 1 to 4 under Step 1: Create an instance profile using the AWS console).
  3. Add the policy provided below to the Account B instance profile role to access the bucket in Account A. This policy gives the instance profile created in Step 2 access to the S3 bucket created in Step 1.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Action": [
                    "s3:ListBucket"
                ],
                "Resource": [
                    "arn:aws:s3:::acc-a-autol-input"
                ]
            },
            {
                "Effect": "Allow",
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": [
                    "arn:aws:s3:::acc-a-autol-input/*"
                ]
            }
        ]
    }
  4. Add a pass role for this instance profile role. This must be added to the role that created the Databricks deployment (refer steps 1 to 8 under Step 5: Add the S3 IAM role to the EC2 policy).
  5.  Go to Account A and add a bucket policy to the bucket in Account A. This S3 bucket policy will grant the instance profile created in Step 2 access to the S3 bucket created in Step 1 in a different account.
    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Sid": "Example permissions",
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account_b_id>:role/acc_b_instance_profile"
                },
                "Action": [
                    "s3:GetBucketLocation",
                    "s3:ListBucket"
                ],
                "Resource": "arn:aws:s3:::acc-a-autol-input"
            },
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::<account_b_id>:role/acc_b_instance_profile"
                },
                "Action": [
                    "s3:PutObject",
                    "s3:GetObject",
                    "s3:DeleteObject",
                    "s3:PutObjectAcl"
                ],
                "Resource": "arn:aws:s3:::acc-a-autol-input/*"
            }
        ]
    }
  6. Execute steps 1 to 5 under Step 6: Add the instance profile to Databricks in the Databricks Workspace console.

Use Autoloader to create SNS-SQS across accounts

For Autoloader to automatically create SNS-SQS, you will need an IAM role with access to create SNS and SQS:

  1. Create an IAM role in Account A. This will have permissions to auto-create SNS-SQS for Autoloader.
    1. Role name example: acc_a_autol_auto_create_role
    2. Policy name example: acc_a_autol_auto_create_policy ā€“ This policy lets Databricks autoloader create an SNS and an SQS with prefix databricks-auto-ingest-* during the stream initialization of the autoloader job.
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "DatabricksAutoLoaderSetup",
                  "Effect": "Allow",
                  "Action": [
                      "s3:GetBucketNotification",
                      "s3:PutBucketNotification",
                      "sns:ListSubscriptionsByTopic",
                      "sns:GetTopicAttributes",
                      "sns:SetTopicAttributes",
                      "sns:CreateTopic",
                      "sns:TagResource",
                      "sns:Publish",
                      "sns:Subscribe",
                      "sqs:CreateQueue",
                      "sqs:DeleteMessage",
                      "sqs:ReceiveMessage",
                      "sqs:SendMessage",
                      "sqs:GetQueueUrl",
                      "sqs:GetQueueAttributes",
                      "sqs:SetQueueAttributes",
                      "sqs:TagQueue",
                      "sqs:ChangeMessageVisibility"
                  ],
                  "Resource": [
                      "arn:aws:s3:::acc-a-autol-input",
                      "arn:aws:sqs:<aws_region>:<account_a_id>:databricks-auto-ingest-*",
                      "arn:aws:sns:<aws_region>:<account_a_id>:databricks-auto-ingest-*"
                  ]
              },
              {
                  "Sid": "DatabricksAutoLoaderList",
                  "Effect": "Allow",
                  "Action": [
                      "sqs:ListQueues",
                      "sqs:ListQueueTags",
                      "sns:ListTopics"
                  ],
                  "Resource": "*"
              },
              {
                  "Sid": "DatabricksAutoLoaderTeardown",
                  "Effect": "Allow",
                  "Action": [
                      "sns:Unsubscribe",
                      "sns:DeleteTopic",
                      "sqs:DeleteQueue"
                  ],
                  "Resource": [
                      "arn:aws:sqs:<aws_region>:<account_a_id>:databricks-auto-ingest-*",
                      "arn:aws:sns:<aws_region>:<account_a_id>:databricks-auto-ingest-*"
                  ]
              }
          ]
      }
    3. Trust Relationship ā€“ lets the instance profile assume this role.
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "Statement",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::<account_b_id>:role/acc_b_instance_profile"
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      }
  2. It needs to be added to the instance profile In Account B. Add this to the existing policy in instance profile IAM role acc_b_instance_profile:
            {
                "Sid": "AssumeRoleAccA",
                "Effect": "Allow",
                "Action": "sts:AssumeRole",
                "Resource": "arn:aws:iam::<account_a_id>:role/acc_a_autol_auto_create_role"
            }
  3. Run and test the Autoloader code from a Notebook in your Databricks Workspace:
    options = {
       "cloudFiles.format": "parquet",
       "cloudFiles.schemaLocation": "s3://acc-a-autol-input/schema/auto/",
       "cloudFiles.useNotifications": True,
       "cloudFiles.roleArn": "arn:aws:iam::<account_a_id>:role/acc_a_autol_auto_create_role",
    }
    
    df = (
       spark.readStream.format("cloudFiles")
       .options(**options)
       .load("s3://acc-a-autol-input/data/auto/")
    )
    
    (
       df.writeStream.option("checkpointLocation", "s3://acc-a-autol-input/checkpoint/auto1/1/")
       .start("s3://acc-a-autol-input/output/auto/")
    )

This Autoloader code will auto-create an SNS and an SQS with names similar to:

  • databricks-auto-ingest-query-fb599288-a1ea-4443-8d31-ed66d86fd9aa-source-0
  • databricks-auto-ingest-query-fb599288-a1ea-4443-8d31-ed66d86fd9aa-source-0

These will get created once during the Stream Initialization of the Autoloader.

Manually create SNS-SQS for cross-account Autoloader

If you want to manually create SNS-SQS and link this to the Autoloader, follow these steps:

  1. Create an SNS in Account A called acc_a_autol_sns.
    1. Type: Standard
    2. Access policy of the SNS to read events from S3 bucket in Acc A:
      {
        "Version": "2008-10-17",
        "Id": "notificationPolicy",
        "Statement": [
          {
            "Sid": "allowS3Notification",
            "Effect": "Allow",
            "Principal": {
              "AWS": "*"
            },
            "Action": "SNS:Publish",
            "Resource": "arn:aws:sns:<aws_region>:<account_a_id>:acc_a_autol_sns",
            "Condition": {
              "ArnLike": {
                "aws:SourceArn": "arn:aws:s3:*:*:acc-a-autol-input"
              }
            }
          }
        ]
      }
  2.  Create an SQS in Account A called acc_a_autol_sqs.
    1. Type: Standard
    2. Access policy of the SQS to get messages from the SNS:
      {
        "Version": "2008-10-17",
        "Id": "notificationPolicy",
        "Statement": [
          {
            "Sid": "allowS3Notification",
            "Effect": "Allow",
            "Principal": {
              "AWS": "*"
            },
            "Action": "SQS:SendMessage",
            "Resource": "arn:aws:sqs:<aws_region>:<account_a_id>:acc_a_autol_sqs",
            "Condition": {
              "ArnLike": {
                "aws:SourceArn": "arn:aws:sns:<aws_region>:<account_a_id>:acc_a_autol_sns"
              }
            }
          }
        ]
      }
    3. Subscribe to the above SNS
  3.  Create an S3 bucket event notification
    1. Name: databricks-autol-events
    2. Prefix: data/manual/
    3. All object create events
    4. Destination: Put the SNS ARN
  4. Create an IAM role with SNS-SQS Autoloader auto-create policy in Account A
    1. Role Name: acc_a_autol_manual_create_role
    2. Policy Name: acc_a_autol_manual_create_policy ā€“ This policy lets Databricks autoloader access to the SNS and SQS created in Steps 1 and 2.
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "DatabricksAutoLoaderUse",
                  "Effect": "Allow",
                  "Action": [
                      "s3:GetBucketNotification",
                      "sns:ListSubscriptionsByTopic",
                      "sns:GetTopicAttributes",
                      "sns:TagResource",
                      "sns:Publish",
                      "sqs:DeleteMessage",
                      "sqs:ReceiveMessage",
                      "sqs:SendMessage",
                      "sqs:GetQueueUrl",
                      "sqs:GetQueueAttributes",
                      "sqs:TagQueue",
                      "sqs:ChangeMessageVisibility"
                  ],
                  "Resource": [
                      "arn:aws:sqs:<aws_region>:<account_a_id>:acc_a_autol_sqs",
                      "arn:aws:sns:<aws_region>:<account_a_id>:acc_a_autol_sns",
                      "arn:aws:s3:::acc-a-autol-input"
                  ]
              }
          ]
      }
    3. Trust Relationship ā€“ lets the instance profile assume this role.
      {
          "Version": "2012-10-17",
          "Statement": [
              {
                  "Sid": "Statement",
                  "Effect": "Allow",
                  "Principal": {
                      "AWS": "arn:aws:iam::<account_b_id>:role/acc_b_instance_profile"
                  },
                  "Action": "sts:AssumeRole"
              }
          ]
      }
    4. Add this to the existing policy in instance profile IAM role acc_b_instance_profile:

             {
                  "Sid": "AssumeManualRoleAccA",
                  "Effect": "Allow",
                  "Action": "sts:AssumeRole",
                  "Resource": "arn:aws:iam::<account_a_id>:role/acc_a_autol_manual_create_role"
              }

Test cross-account Autoloader connection

Run and test the Autoloader code from a notebook in Databricks Workspace:

 

options = {
   "cloudFiles.format": "parquet",
   "cloudFiles.schemaLocation": "s3://acc-a-autol-input/schema/manual/",
   "cloudFiles.useNotifications": True,
   "cloudFiles.roleArn": "arn:aws:iam::<account_a_id>:role/acc_a_autol_manual_create_role",
   "cloudFiles.queueUrl": "https://sqs.<aws_region>.amazonaws.com/<account_a_id>/acc_a_autol_sqs"
}

df = (
   spark.readStream.format("cloudFiles")
   .options(**options)
   .load("s3://acc-a-autol-input/data/manual/")
)

(
   df.writeStream.option("checkpointLocation", "s3://acc-a-autol-input/checkpoint/manual1/1/")
   .start("s3://acc-a-autol-input/output/manual/")
)

 

 

By following these steps, you should be able to successfully set up a cross-account Autoloader connection with Databricks.

2 Comments
iddoi
New Contributor II

Hello!

Thanks a lot for this excellent and detailed write up. I have tried to configure this myself but for some reason couldn't make it work.

My cluster was able to assume the instance profile in Account B successfully. I was then able to use boto3 in a Notebook to assume the role in Account A and to get the appropriate S3 object from Account A.

 

sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
    RoleArn=role_arn,
    RoleSessionName=session_name,
    ExternalId=external_id
)

# Get the temporary credentials
credentials = assumed_role_object['Credentials']

# Create an S3 client using the temporary credentials
s3_client = boto3.client(
    's3',
    aws_access_key_id=credentials['AccessKeyId'],
    aws_secret_access_key=credentials['SecretAccessKey'],
    aws_session_token=credentials['SessionToken']
)

# Try to get the object
response = s3_client.get_object(Bucket=bucket_name, Key=file_key)
    
# Read some of the content
content = response['Body'].read(1024)  # Read first 1KB
print(f"Successfully accessed the file. First 1KB of content:\n{content}")

 

I then attempted to use spark for this and got an AccessDeniedException:

 

from pyspark.sql import SparkSession

# Initialize Spark session
spark = SparkSession.builder.appName("S3DataReadTest").getOrCreate()

# Configure Spark to use AWS IAM role
spark.conf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.auth.AssumedRoleCredentialProvider")
spark.conf.set("spark.hadoop.fs.s3a.assumed.role.arn", role_arn)
spark.conf.set("spark.hadoop.fs.s3a.assumed.role.session.name", role_session_name)
spark.conf.set("spark.hadoop.fs.s3a.assumed.role.external.id", role_external_id)
spark.conf.set("spark.hadoop.fs.s3a.assumed.role.sts.endpoint", "sts.amazonaws.com")

# Read the data
data_df = spark.read.format("csv").load(s3_bucket_path)

 

I also tried to use the temporary credentials I got from the successful assume_role command in boto3, but that didn't work either:

 

spark = SparkSession.builder \
    .config("spark.hadoop.fs.s3a.access.key", credentials['AccessKeyId']) \
    .config("spark.hadoop.fs.s3a.secret.key", credentials['SecretAccessKey']) \
    .config("spark.hadoop.fs.s3a.session.token", credentials['SessionToken']) \
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.com.amazonaws.services.s3.enableV4", "true") \
    .config("spark.hadoop.fs.s3a.endpoint", "s3.us-east-1.amazonaws.com") \
    .getOrCreate()

# Attempt to read the file
print(f"Attempting to read file: {s3_path}")
df = spark.read.csv(s3_path, header=True, inferSchema=True)

 

Any idea what the issue could be? Any insight would be appreciated! šŸ™

 

 

* By the way, for some reason the boto3 access worked for me when using a private cluster, but didn't work on a shared cluster (even though the instance profile was assumed in both cases).

When running this on a shared cluster:

 

# Assume the role
sts_client = boto3.client('sts')
assumed_role_object = sts_client.assume_role(
    RoleArn=role_arn,
    RoleSessionName=session_name,
    ExternalId=external_id
)

 

I get the following error:

NoCredentialsError: Unable to locate credentials

 

MohanaBasak
Databricks Employee
Databricks Employee

@iddoi This blog is specifically for Databricks Autoloader setup using an Instance Profile in Databricks. If you are trying to read data from an S3 bucket (either in the same AWS account, or a different one) to a Databricks notebook, the recommended approach is to use Unity Catalog (UC). With UC, you will not need any instance profile, or need to set any Spark configs. Once you successfully set a connection to an S3 bucket with UC (storage credential and external location), you will directly be able to read from that bucket.

If you are not using UC, and you are trying to connect using an Instance Profile, the bucket in the other AWS account will be accessible if the instance profile permissions are properly set. With this also you will not need any Spark configs if the Instance Profile setup is correct.

That said, I was able to read from an S3 bucket by using your boto3 STS code to get temporary credentials, and then setting hadoop credentials in this way:

aws_access_key_id=credentials['AccessKeyId']
aws_secret_access_key=credentials['SecretAccessKey']
aws_session_token=credentials['SessionToken']

sc._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider")
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key_id)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.session.token", aws_session_token)

data_df = spark.read.format("csv").load('s3://acc-a-autol-input/rnd-2022-notes.csv')
display(data_df)

I hope this helps. Again, Unity Catalog is the recommended way of reading data from S3 to Databricks.