I have some DLT pipelines working fine ingesting files from S3. Now I'm trying to build a pipeline to ingest files from GCS using Auto Loader. I'm running Databricks on AWS.
The code I have:
import dlt
import json
from pyspark.sql.functions import col
# Get GCS credentials
gcs_key = json.loads(dbutils.secrets.get(scope="finops", key="[REDACTED]"))
# Get environment from configuration
environment = spark.conf.get("environment")
gcs_bucket_name = "[REDACTED]"
# Define cloud file options
cloudfile = {
"cloudFiles.format": "csv",
"cloudFiles.inferColumnTypes": "true",
"cloudFiles.allowOverwrites": "true",
"cloudFiles.projectId": gcs_key.get("project_id"),
"cloudFiles.client": gcs_key.get("client_id"),
"cloudFiles.clientEmail": gcs_key.get("client_email"),
"cloudFiles.privateKey": gcs_key.get("private_key"),
"cloudFiles.privateKeyId": gcs_key.get("private_key_id")
}
# Consumo das contas
valid_records = {
"valid_date": "day IS NOT NULL",
"valid_project": "project_id IS NOT NULL",
"valid_service": "service_id IS NOT NULL",
}
# Define the streaming view with metadata columns
@dlt.view
@dlt.expect_all_or_drop(valid_records)
def consumo_das_contas_raw():
return (
spark.readStream
.format('cloudFiles')
.options(**cloudfile)
.load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
.withColumn("update_timestamp", col("_metadata.file_modification_time")) # Use file modification time
)
full_table_name = f"{environment}_[REDACTED].gcp_fin_ops.consumo_das_contas"
dlt.create_streaming_table(full_table_name)
dlt.apply_changes(
target = full_table_name,
source = "consumo_das_contas_raw",
keys = ["day", "project_id", "service_id"],
sequence_by = "update_timestamp",
except_column_list = ["update_timestamp"],
stored_as_scd_type = 1
)
When I run it I get the error:
py4j.protocol.Py4JJavaError: Traceback (most recent call last):
File "/Workspace/Users/[REDACTED]/.bundle/FinOps/dev/files/src/finops_gcp_pipeline", cell 1, line 41, in consumo_das_contas_raw
.load(f'gs://{gcs_bucket_name}/cost-reports/daily/*/billing_report_*.csv')
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
py4j.protocol.Py4JJavaError: An error occurred while calling o902.load.
: java.io.IOException: Error getting access token from metadata server at: http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:250)
at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredential(CredentialFactory.java:389)
at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.getCredential(GoogleHadoopFileSystemBase.java:1807)
at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.createGcsFs(GoogleHadoopFileSystemBase.java:1951)
at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.configure(GoogleHadoopFileSystemBase.java:1936)
at shaded.databricks.com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase.initialize(GoogleHadoopFileSystemBase.java:501)
at com.databricks.common.filesystem.LokiGCFS.initialize(LokiGCFS.scala:37)
...
Caused by: shaded.databricks.com.google.api.client.http.HttpResponseException: 404 Not Found
GET http://169.254.169.254/computeMetadata/v1/instance/service-accounts/default/token
endpoint not available
at shaded.databricks.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:1113)
at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory$ComputeCredentialWithRetry.executeRefreshToken(CredentialFactory.java:192)
at shaded.databricks.com.google.api.client.auth.oauth2.Credential.refreshToken(Credential.java:470)
at shaded.databricks.com.google.cloud.hadoop.util.CredentialFactory.getCredentialFromMetadataServiceAccount(CredentialFactory.java:247)
... 269 more
It looks like it's trying to use some function that only exists in Databricks on GCP to connect. I hope someone has a clue on what is happening.