DLT Pipeline
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
Hello,
I have written below simple code to write data to Catalogue table using simple DLT pipeline .
As part of Below program am reading a file from blob container and trying to write to a Catalogue table . New catalogue table got created but table does not have any data in it . Do I need to explicitly give a write command in the code ? OR is there any alternative way to do it .
import dlt
storage_account_name = <Storage_Account>
storage_account_key = <account-key>
container_name = "e2e-data"
blob_file_path = "Test/tab1/tab1.csv"
spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key
)
csv_file_path = f"abfss://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_file_path}"
schema = StructType([StructField("col1", IntegerType(), True), StructField("col2", StringType(), True), StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5", StringType(), True)])
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def ingest_from_storage():
return (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(csv_file_path)
)
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
3 weeks ago
The issue with your DLT pipeline is that you've defined the table and schema correctly, but you haven't actually implemented the data loading logic in your `ingest_from_storage()` function. While you've created the function, you're not calling it anywhere in your code, which is why your table is created but remains empty.
How to Fix Your Code
You need to modify your code to either:
1. Call the `ingest_from_storage()` function within the `@dlt.table` decorator, or
2. Move your data loading logic directly into the `@dlt.table` decorator function
Here's how you can fix it:
```python
import dlt
storage_account_name = <Storage_Account>
storage_account_key = <account-key>
container_name = "e2e-data"
blob_file_path = "Test/tabl/tabl.csv"
spark.conf.set(
f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",
storage_account_key
)
csv_file_path = f"abfss://{container_name}@{storage_account_name}.blob.core.windows.net/{blob_file_path}"
schema = StructType([StructField("col1", IntegerType(), True), StructField("col2", StringType(), True),
StructField("col3", StringType(), True), StructField("col4", StringType(), True), StructField("col5",
StringType(), True)])
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return (
spark.read.format("csv")
.option("header", "true")
.schema(schema)
.load(csv_file_path)
)
```
Alternatively, you can keep your existing function and call it from the DLT table decorator:
```python
@Dlt.table(
comment="Raw data loaded from cloud storage",
name="main.default.table_name",
table_properties={
"quality": "bronze"
}
)
def load_raw_data():
return ingest_from_storage()
```
Additional Considerations
1. Make sure your Azure Blob Storage credentials and path are correct
2. Verify that the CSV file exists at the specified path and has data
3. If using Unity Catalog, ensure you have the proper permissions to write to the target catalog and schema
4. For debugging, you can add print statements or use the DLT pipeline logs to verify the data is being read correctly
Remember that in DLT, the table definition (`@dlt.table`) needs to return a DataFrame that contains the data you want to write to the table. Your current code defines the function to read the data but doesn't actually use that function anywhere in the DLT pipeline flow.

