The issue with your DLT pipeline is that you've defined the table and schema correctly, but you haven't actually implemented the data loading logic in your `ingest_from_storage()` function. While you've created the function, you're not calling it anywhere in your code, which is why your table is created but remains empty.
How to Fix Your Code
You need to modify your code to either:
1. Call the `ingest_from_storage()` function within the `@dlt.table` decorator, or
2. Move your data loading logic directly into the `@dlt.table` decorator function
Here's how you can fix it:
```python
import dlt
storage_account_name = <Storage_Account>
storage_account_key = <account-key>
container_name = "e2e-data"
blob_file_path = "Test/tabl/tabl.csv"
spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key
)
csv_file_path = f"abfss://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_file_path}"
schema = StructType([StructField("col1", IntegerType(), True), StructField("col2", StringType(), True),
StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5",
StringType(), True)])
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(csv_file_path)
)
```
Alternatively, you can keep your existing function and call it from the DLT table decorator:
```python
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return ingest_from_storage()
```
Additional Considerations
1. Make sure your Azure Blob Storage credentials and path are correct
2. Verify that the CSV file exists at the specified path and has data
3. If using Unity Catalog, ensure you have the proper permissions to write to the target catalog and schema
4. For debugging, you can add print statements or use the DLT pipeline logs to verify the data is being read correctly
Remember that in DLT, the table definition (`@dlt.table`) needs to return a DataFrame that contains the data you want to write to the table. Your current code defines the function to read the data but doesn't actually use that function anywhere in the DLT pipeline flow.