cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Discussions
Start your journey with Databricks by joining discussions on getting started guides, tutorials, and introductory topics. Connect with beginners and experts alike to kickstart your Databricks experience.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT Pipeline

Anuradha_Mel
New Contributor

Hello,

 I have written below simple code to write data to Catalogue table using simple DLT pipeline .

As part of Below program am reading a file from blob container and trying to write to a Catalogue table . New catalogue table got created but table  does not have any data in it . Do I need to explicitly give a write command in the code ? OR is there any alternative way to do it .

 

Spoiler
Spoiler

import dlt

storage_account_name = <Storage_Account>
storage_account_key = <account-key>
container_name = "e2e-data"
blob_file_path = "Test/tab1/tab1.csv"


spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key
)

csv_file_path = f"abfss://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_file_path}"


schema = StructType([StructField("col1", IntegerType(), True), StructField("col2", StringType(), True), StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5", StringType(), True)])

@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",

table_properties={
"quality": "bronze"
}
)
def ingest_from_storage():
return (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(csv_file_path)
)

1 REPLY 1

BigRoux
Databricks Employee
Databricks Employee

The issue with your DLT pipeline is that you've defined the table and schema correctly, but you haven't actually implemented the data loading logic in your `ingest_from_storage()` function. While you've created the function, you're not calling it anywhere in your code, which is why your table is created but remains empty.

How to Fix Your Code

You need to modify your code to either:

1. Call the `ingest_from_storage()` function within the `@dlt.table` decorator, or
2. Move your data loading logic directly into the `@dlt.table` decorator function

Here's how you can fix it:

```python
import dlt

storage_account_name = <Storage_Account>
storage_account_key = <account-key>
container_name = "e2e-data"
blob_file_path = "Test/tabl/tabl.csv"

spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key
)

csv_file_path = f"abfss://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_file_path}"

schema = StructType([StructField("col1", IntegerType(), True), StructField("col2", StringType(), True),
StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5",
StringType(), True)])

@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(csv_file_path)
)
```

Alternatively, you can keep your existing function and call it from the DLT table decorator:

```python
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return ingest_from_storage()
```

Additional Considerations

1. Make sure your Azure Blob Storage credentials and path are correct
2. Verify that the CSV file exists at the specified path and has data
3. If using Unity Catalog, ensure you have the proper permissions to write to the target catalog and schema
4. For debugging, you can add print statements or use the DLT pipeline logs to verify the data is being read correctly

Remember that in DLT, the table definition (`@dlt.table`) needs to return a DataFrame that contains the data you want to write to the table. Your current code defines the function to read the data but doesn't actually use that function anywhere in the DLT pipeline flow.

Join Us as a Local Community Builder!

Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!

Sign Up Now