cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingest files from GCS with Auto Loader in DLT pipeline running on AWS

fscaravelli
New Contributor

I have some DLT pipelines working fine ingesting files from S3. Now I'm trying to build a pipeline to ingest files from GCS using Auto Loader. I'm running Databricks on AWS.

The code I have:

import dlt
import json
from pyspark.sql.functions import col

# Get GCS credentials
gcs_key = json.loads(dbutils.secrets.get(scope="finops", key="[REDACTED]"))

# Get environment from configuration
environment = spark.conf.get("environment")

gcs_bucket_name = "[REDACTED]"

# Define cloud file options
cloudfile = {
    "cloudFiles.format": "csv",
    "cloudFiles.inferColumnTypes": "true",
    "cloudFiles.allowOverwrites": "true",
    "cloudFiles.projectId": gcs_key.get("project_id"),
    "cloudFiles.client": gcs_key.get("client_id"),
    "cloudFiles.clientEmail": gcs_key.get("client_email"),
    "cloudFiles.privateKey": gcs_key.get("private_key"),
    "cloudFiles.privateKeyId": gcs_key.get("private_key_id")
}

# Consumo das contas

valid_records = {
    "valid_date": "day IS NOT NULL",
    "valid_project": "project_id IS NOT NULL",
    "valid_service": "service_id IS NOT NULL",
}

# Define the streaming view with metadata columns
@dlt.view
@dlt.expect_all_or_drop(valid_records)
def consumo_das_contas_raw():
    return (
        spark.readStream
        .format('cloudFiles')
        .options(**cloudfile)
        .load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
        .withColumn("update_timestamp", col("_metadata.file_modification_time"))  # Use file modification time
    )

full_table_name = f"{environment}_[REDACTED].gcp_fin_ops.consumo_das_contas"

dlt.create_streaming_table(full_table_name)

dlt.apply_changes(
  target = full_table_name,
  source = "consumo_das_contas_raw",
  keys = ["day", "project_id", "service_id"],
  sequence_by = "update_timestamp",
  except_column_list = ["update_timestamp"],
  stored_as_scd_type = 1
)

 When I run it I get the error:

py4j.protocol.Py4JJavaError: Traceback (most recent call last):
  File "/Workspace/Users/[REDACTED]/.bundle/FinOps/dev/files/src/finops_gcp_pipeline", cell 1, line 41, in consumo_das_contas_raw
    .load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

py4j.protocol.Py4JJavaError: An error occurred while calling o902.load.
: java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:250)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:389)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1807)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1951)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1936)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:501)
	at com.databricks.common.filesystem.LokiGCFS.initialize(LokiGCFS.scala:37)
	...
Caused by: shaded.databricks.com.google.api.client.http.HttpResponseException: 404 Not Found
GET http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
endpoint not available
	at shaded.databricks.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:192)
	at shaded.databricks.com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:470)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:247)
	... 269 more

It looks like it's trying to use some function that only exists in Databricks on GCP to connect. I hope someone has a clue on what is happening.

1 REPLY 1

mark_ott
Databricks Employee
Databricks Employee

Your error is due to how Databricks on AWS is trying to access GCS: it's defaulting to using the GCP metadata server (which only exists on Google Cloud VMs), not the service account key you provided. This is a common issue when connecting GCS from non-GCP platforms like Databricks on AWS.

Why This Happens

  • By default, the Databricks GCS connector attempts to use the GCE metadata server to obtain authentication if not configured otherwise.

  • On AWS, this metadata server doesn't exist (404 error you see), so authentication fails.

  • Merely supplying the credentials to Auto Loader's .option() dictionary is often not enough—the Hadoop GCS connector expects them as Spark Hadoop configuration properties.

Solution: Configure GCS Credentials in Spark/Hadoop

You need to set your GCS credentials in the Hadoop configuration, not just as cloudFiles.* options.

Steps to Fix:

  1. Set Hadoop Options Globally
    Before you read from GCS, add this in your notebook or cluster initialization:

    python
    spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true") spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/dbfs/tmp/gcs-key.json")
    • Dump your key JSON to a file (ephemeral ok for testing):

      python
      with open("/dbfs/tmp/gcs-key.json", "w") as f: f.write(json.dumps(gcs_key))
  2. Remove Auth Info from .options()
    You don't need to pass individual credential fields as cloudFiles.* options. Only use cloudFiles.format, etc., there. The keyfile method is preferred for Hadoop.

  3. Restart Your Cluster if Needed
    Hadoop configs may require a restart if set at cluster level.

  4. Double Check Permissions
    Ensure your GCP service account actually has Storage Object Viewer (and perhaps Storage Object Admin for writing) on the target bucket.

Sample Minimal Correct Setup

python
import json # Write GCP key to DBFS with open("/dbfs/tmp/gcs-key.json", "w") as f: f.write(json.dumps(gcs_key)) spark.conf.set("spark.hadoop.google.cloud.auth.service.account.enable", "true") spark.conf.set("spark.hadoop.google.cloud.auth.service.account.json.keyfile", "/dbfs/tmp/gcs-key.json") cloudfile = { "cloudFiles.format": "csv", "cloudFiles.inferColumnTypes": "true", "cloudFiles.allowOverwrites": "true" } @Dlt.view def consumo_das_contas_raw(): return ( spark.readStream .format('cloudFiles') .options(**cloudfile) .load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv') )

This setup should allow Databricks on AWS to read files from GCS using your provided service account key, sidestepping the metadata API.


 

References

If you need to do this securely, consider using a secrets manager to inject paths/keys, and clean up the temp file after use.