cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Ingest files from GCS with Auto Loader in DLT pipeline running on AWS

fscaravelli
New Contributor

I have some DLT pipelines working fine ingesting files from S3. Now I'm trying to build a pipeline to ingest files from GCS using Auto Loader. I'm running Databricks on AWS.

The code I have:

import dlt
import json
from pyspark.sql.functions import col

# Get GCS credentials
gcs_key = json.loads(dbutils.secrets.get(scope="finops", key="[REDACTED]"))

# Get environment from configuration
environment = spark.conf.get("environment")

gcs_bucket_name = "[REDACTED]"

# Define cloud file options
cloudfile = {
    "cloudFiles.format": "csv",
    "cloudFiles.inferColumnTypes": "true",
    "cloudFiles.allowOverwrites": "true",
    "cloudFiles.projectId": gcs_key.get("project_id"),
    "cloudFiles.client": gcs_key.get("client_id"),
    "cloudFiles.clientEmail": gcs_key.get("client_email"),
    "cloudFiles.privateKey": gcs_key.get("private_key"),
    "cloudFiles.privateKeyId": gcs_key.get("private_key_id")
}

# Consumo das contas

valid_records = {
    "valid_date": "day IS NOT NULL",
    "valid_project": "project_id IS NOT NULL",
    "valid_service": "service_id IS NOT NULL",
}

# Define the streaming view with metadata columns
@dlt.view
@dlt.expect_all_or_drop(valid_records)
def consumo_das_contas_raw():
    return (
        spark.readStream
        .format('cloudFiles')
        .options(**cloudfile)
        .load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
        .withColumn("update_timestamp", col("_metadata.file_modification_time"))  # Use file modification time
    )

full_table_name = f"{environment}_[REDACTED].gcp_fin_ops.consumo_das_contas"

dlt.create_streaming_table(full_table_name)

dlt.apply_changes(
  target = full_table_name,
  source = "consumo_das_contas_raw",
  keys = ["day", "project_id", "service_id"],
  sequence_by = "update_timestamp",
  except_column_list = ["update_timestamp"],
  stored_as_scd_type = 1
)

 When I run it I get the error:

py4j.protocol.Py4JJavaError: Traceback (most recent call last):
  File "/Workspace/Users/[REDACTED]/.bundle/FinOps/dev/files/src/finops_gcp_pipeline", cell 1, line 41, in consumo_das_contas_raw
    .load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

py4j.protocol.Py4JJavaError: An error occurred while calling o902.load.
: java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:250)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:389)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1807)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1951)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1936)
	at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:501)
	at com.databricks.common.filesystem.LokiGCFS.initialize(LokiGCFS.scala:37)
	...
Caused by: shaded.databricks.com.google.api.client.http.HttpResponseException: 404 Not Found
GET http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
endpoint not available
	at shaded.databricks.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:192)
	at shaded.databricks.com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:470)
	at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:247)
	... 269 more

It looks like it's trying to use some function that only exists in Databricks on GCP to connect. I hope someone has a clue on what is happening.

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group