cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Getting error while using Live.target_table in dlt pipeline

ashraf1395
Valued Contributor

I have created a target table in the same dlt pipeline. 
But when I read that table in different block of notebook with Live.table_path. It is not able to read it 

Here is my code block 1 Creating a streaming table

 

# Define metadata tables
catalog = spark.conf.get("catalog")
entity_table_path = f"{catalog}.metadata.entity"
elements_table_path = f"{catalog}.metadata.elements"
user = spark.conf.get("user")

import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")

# Import necessary modules
import dlt
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, IntegerType
from pyspark.sql.functions import col
from utils.dlt_utils import create_dlt_table, apply_schema_evolution
from core.metadata_management import apply_classification_tags, map_data_type
from utils.logging_handling import log_event, get_logger

# Initialize logger
logger = get_logger()

# Read metadata
entities = spark.read.table(entity_table_path)
elements = spark.read.table(elements_table_path)

# Iterate through each entity (table)
for entity_row in entities.collect():
    table_name = entity_row["Entity_Technical_Name"]
    target_catalog = entity_row["Target_Catalog"]
    target_schema = entity_row["Target_Schema"]
    table_path = f"{target_catalog}.{target_schema}.{table_name}"

    filtered_elements = elements.filter(elements["Source_feed_name"] == table_name).orderBy("Position")

    # Define schema using elements metadata
    schema = StructType([
        StructField(
            row["Entity_Technical_Name"],  # Adjusted to match column key
            map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
            row["Nullable"] == "yes"
        )
        for row in filtered_elements.filter(elements["Source_feed_name"] == table_name).collect()
    ])

    # Check if the table exists using inline logic
    try:
        spark.sql(f"DESCRIBE TABLE {table_path}")
        table_exists = True
    except Exception:
        table_exists = False
# Check if the table exists and is not empty
    try:
        table_exists = True
        table_is_empty = spark.sql(f"SELECT COUNT(1) FROM {table_path}").collect()[0][0] == 0
    except Exception:
        table_exists = False
        table_is_empty = True

    # If the table exists and is not empty, use it as a streaming source
    if table_exists and not table_is_empty:
        dataframe = spark.readStream.table(table_path)
        create_dlt_table(
            dataframe=dataframe,
            table_name=table_name,
            schema=schema,
            table_type="streaming"
        )
        log_event(logger, f"Streaming table {table_name} initialized with existing data.", "INFO")

    # If the table doesn't exist or is empty, create an empty streaming table
    else:
        create_dlt_table(
            table_name=table_name,
            schema=schema,
            table_type="streaming"
        )
        log_event(logger, f"Created new empty streaming table {table_path}.", "INFO")

    # Apply the classification tags
    # apply_classification_tags(target_catalog, target_schema, table_name)




the part of create_dlt_table function used in the above block(in a core.py file)

# If neither DataFrame nor input path is provided, create an empty table
        else:
            if not schema:
                raise ValueError("Schema must be provided for empty tables without input_path or DataFrame.")

            @Dlt.table(
                name=table_name,
                comment=f"Empty {table_type} DLT table created with schema."
                
            )
            def empty_table():
                log_event(logger, f"Creating empty {table_type} table {table_name}.", "INFO")

                if table_type == "streaming":
                    # Create a dummy streaming DataFrame with the `rate` source
                    streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()

                    # Map the `rate` source columns to match the schema with default null values
                    empty_stream_df = streaming_df.select(
                        *[lit(None).cast(field.dataType).alias(field.name) for field in schema.fields]
                    ).where("1=0")  # Ensures the stream is empty
                    return empty_stream_df
                else:
                    # Create an empty DataFrame for materialized view
                    return spark.createDataFrame([], schema)

            log_event(logger, f"Empty {table_type} table {table_name} created successfully.", "INFO")

 

 Code block 2: Ingestion data from auto_loader into the target_Table using append_flow and adding some extra columns

 

# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")

# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from core.model_extensions import apply_model_extensions
from pyspark.sql import SparkSession

# Initialize logger and Spark session
logger = get_logger()


log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"

filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")

    # Define schema using elements metadata
schema = StructType([
        StructField(
            row["Entity_Technical_Name"],  # Adjusted to match column key
            map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
            row["Nullable"] == "yes"
        )
        for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")
# @Dlt.table(name ="test")
# def test_table():
#     return spark.read.table(table_path)
    

header="true"
evolve_schema=True
schema_hints = None
if schema and evolve_schema:
    schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
    # schema_hints = {field.name: field.dataType.simpleString() for field in schema.fields}
log_event(logger, f"Schema hints: {schema_hints}", "INFO")
# Determine schema evolution mode based on header and evolve_schema
if header == "true":
    infer_column_types = "true"  # Infer schema from file
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
    infer_column_types = "false"  # Do not infer schema
    schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"

# Define the cloud file configuration
cloudfile = {
    "cloudFiles.format": file_format,
    "cloudFiles.allowOverwrites": "false",
    "cloudFiles.inferColumnTypes": infer_column_types,
    "cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
    "cloudFiles.schemaEvolutionMode": schema_evolution_mode,
    "cloudFiles.schemaHints": schema_hints,
    "cloudFiles.useNotifications": "false",
    "cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
    "header": header,  # Pass the header option directly
    "delimiter": "|",  # Set pipe as the delimiter
    "quote": "\"",     # Handle quoted fields
    "escape": "\\"     # Use backslash as escape character
}

table_type = "Streaming"
@dlt.append_flow(
    target = target_table_name,
    name = f"{target_table_name}_{file_format}_ingestion",
    # name = f"{target_table_name}",
    comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
    input_df = (spark
          .readStream
          .format("cloudfiles")
          .options(**cloudfile)
          .options(**file_specific_config)
          .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
          .load(data_file_path))
    
    # return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
    input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
    log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")

    # Write to Delta
    input_df.writeStream \
        .format("delta") \
        .option("mergeSchema", "true") \
        .outputMode("append") \
        .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
        .start(table_path)

    return input_df


log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")

 

 Code block 3: Doing some post inegstion checks using append_flow again. I want to use expectations as well on this block but cannot use bcz expectations not allowed in append_flow. (Plz provide a solution or idea for this as well)

 

import sys
from delta.tables import DeltaTable

from pyspark.sql.functions import expr, lit, count

# Add your module paths
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")

# Import necessary modules
from utils.logging_handling import get_logger, log_event
from utils.dlt_utils import create_dlt_table
from utils.helper import get_rules, get_primary_key

# Initialize logger
logger = get_logger()
log_event(logger, "Started Post Ingestion Checks", "INFO")

# Get table details
target_table_name = spark.conf.get("target_table_name")
target_catalog = spark.conf.get("catalog")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"

log_event(logger, f"DLT table {table_path}", "INFO")
# @Dlt.expect_all(get_rules(table_path))
@dlt.append_flow(
    target=f"{target_table_name}",
    name = f"{target_table_name}_post_ingestion_checks"
)
def validation_table():
    # Read the source Delta table
    record_count = spark.read.table(f"LIVE.{table_path}").count()
    log_event(logger, f"Record count: {record_count}", "INFO")

    df = spark.readStream.table(f"LIVE.{table_path}")
    # df.withColumn("record_count", lit(record_count))
    

    # Add quarantine flag using the dynamic record_count
    rules = get_rules(table_path)
    log_event(logger, f"Quarantine rules: {list(rules.values())}", "INFO")

    if len(rules) == 1:
        quarantine_rules = f"NOT({list(rules.values())[0]})"
    else:
        quarantine_rules = f"NOT({' OR '.join(rules.values())})"
    log_event(logger, f"Quarantine rules applied: {quarantine_rules}", "INFO")

    df_quarantined = df.withColumn("record_count", lit(record_count)).withColumn("Quarantined_flag", expr(quarantine_rules))
    df_quarantined = df_quarantined.drop("record_count")

    # Write the updated DataFrame
    query = (
        df_quarantined.writeStream.format("delta")
        .option("mergeSchema", "true")
        .option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
        .outputMode("append")
        .start(table_path)
    )

 

Getting this error 

 

py4j.protocol.Py4JJavaError: Traceback (most recent call last):
  File "/Workspace/Users/ashraf@wtdanalytics.com/.bundle/wtd_ingestion_framework/dev/files/src/notebooks/load_metadata/task_schema_changes", cell 6, line 31, in validation_table
    record_count = spark.read.table(f"LIVE.{table_path}").count()
                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

py4j.protocol.Py4JJavaError: An error occurred while calling z:com.databricks.pipelines.Pipeline.readBatchInput.
: com.databricks.pipelines.common.errors.DLTSparkException: [DATASET_NOT_DEFINED] Failed to read dataset 'accounts_and_customer.bronze.bb1123_loans'. This dataset is not defined in the pipeline.
If this table is managed by another pipeline, then do not use `dlt.read` / `dlt.readStream` to read the table or prepend the name with the LIVE keyword.

 

 

3 REPLIES 3

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @ashraf1395,

The error you are encountering, py4j.protocol.Py4JJavaError: An error occurred while calling z:com.databricks.pipelines.Pipeline.readBatchInput. : com.databricks.pipelines.common.errors.DLTSparkException: [DATASET_NOT_DEFINED] Failed to read dataset 'accounts_and_customer.bronze.bb1123_loans'. This dataset is not defined in the pipeline., indicates that the dataset accounts_and_customer.bronze.bb1123_loans is not defined within the Delta Live Tables (DLT) pipeline.

Here are a few steps to troubleshoot this issue:

  1. Check Dataset Definition: Ensure that the dataset accounts_and_customer.bronze.bb1123_loans is defined within the same DLT pipeline. You can define datasets using the @dlt.table decorator for tables or @dlt.view for views. For example:

import dlt

@dlt.table

def bb1123_loans():

    return spark.read.format("delta").load("/path/to/your/dataset")

Use the Correct Dataset Name: When referencing datasets within the same pipeline, prepend the dataset name with LIVE. For example

df = spark.readStream.table("LIVE.bb1123_loans")

Pipeline Configuration: Ensure that the pipeline configuration includes the necessary datasets. If the dataset is managed by another pipeline, you should not use dlt.read or dlt.readStream to read the table. Instead, use spark.read.table or spark.readStream.table with the appropriate table name

ashraf1395
Valued Contributor

The dataset is being defined in the same pipeline in code block 1 using @Dlt.table decorator. then being appended as well in 2nd block @Dlt.append_flow. Then in the third block I am calling it using Live.target_table_name. Its not working failing. table names,cases everything seems correct

ashraf1395
Valued Contributor

Cant we use Live.table_name on a target dlt table with @Dlt.append_flow decorator.

If yes can you share the code bcz when I tried I am getting error.


Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group