Sunday
I have created a target table in the same dlt pipeline.
But when I read that table in different block of notebook with Live.table_path. It is not able to read it
Here is my code block 1 Creating a streaming table
# Define metadata tables
catalog = spark.conf.get("catalog")
entity_table_path = f"{catalog}.metadata.entity"
elements_table_path = f"{catalog}.metadata.elements"
user = spark.conf.get("user")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")
# Import necessary modules
import dlt
from pyspark.sql.types import StructType, StructField, StringType, DecimalType, IntegerType
from pyspark.sql.functions import col
from utils.dlt_utils import create_dlt_table, apply_schema_evolution
from core.metadata_management import apply_classification_tags, map_data_type
from utils.logging_handling import log_event, get_logger
# Initialize logger
logger = get_logger()
# Read metadata
entities = spark.read.table(entity_table_path)
elements = spark.read.table(elements_table_path)
# Iterate through each entity (table)
for entity_row in entities.collect():
table_name = entity_row["Entity_Technical_Name"]
target_catalog = entity_row["Target_Catalog"]
target_schema = entity_row["Target_Schema"]
table_path = f"{target_catalog}.{target_schema}.{table_name}"
filtered_elements = elements.filter(elements["Source_feed_name"] == table_name).orderBy("Position")
# Define schema using elements metadata
schema = StructType([
StructField(
row["Entity_Technical_Name"], # Adjusted to match column key
map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
row["Nullable"] == "yes"
)
for row in filtered_elements.filter(elements["Source_feed_name"] == table_name).collect()
])
# Check if the table exists using inline logic
try:
spark.sql(f"DESCRIBE TABLE {table_path}")
table_exists = True
except Exception:
table_exists = False
# Check if the table exists and is not empty
try:
table_exists = True
table_is_empty = spark.sql(f"SELECT COUNT(1) FROM {table_path}").collect()[0][0] == 0
except Exception:
table_exists = False
table_is_empty = True
# If the table exists and is not empty, use it as a streaming source
if table_exists and not table_is_empty:
dataframe = spark.readStream.table(table_path)
create_dlt_table(
dataframe=dataframe,
table_name=table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Streaming table {table_name} initialized with existing data.", "INFO")
# If the table doesn't exist or is empty, create an empty streaming table
else:
create_dlt_table(
table_name=table_name,
schema=schema,
table_type="streaming"
)
log_event(logger, f"Created new empty streaming table {table_path}.", "INFO")
# Apply the classification tags
# apply_classification_tags(target_catalog, target_schema, table_name)
the part of create_dlt_table function used in the above block(in a core.py file)
# If neither DataFrame nor input path is provided, create an empty table
else:
if not schema:
raise ValueError("Schema must be provided for empty tables without input_path or DataFrame.")
@Dlt.table(
name=table_name,
comment=f"Empty {table_type} DLT table created with schema."
)
def empty_table():
log_event(logger, f"Creating empty {table_type} table {table_name}.", "INFO")
if table_type == "streaming":
# Create a dummy streaming DataFrame with the `rate` source
streaming_df = spark.readStream.format("rate").option("rowsPerSecond", 1).load()
# Map the `rate` source columns to match the schema with default null values
empty_stream_df = streaming_df.select(
*[lit(None).cast(field.dataType).alias(field.name) for field in schema.fields]
).where("1=0") # Ensures the stream is empty
return empty_stream_df
else:
# Create an empty DataFrame for materialized view
return spark.createDataFrame([], schema)
log_event(logger, f"Empty {table_type} table {table_name} created successfully.", "INFO")
Code block 2: Ingestion data from auto_loader into the target_Table using append_flow and adding some extra columns
# Load parameters
import dlt
target_catalog = spark.conf.get("catalog").lower()
target_schema = spark.conf.get("target_schema").lower()
target_table_name = spark.conf.get("target_table_name").lower()
data_file_path = spark.conf.get("data_file_path")
header = spark.conf.get("header")
file_format = spark.conf.get("file_format")
user = spark.conf.get("user")
evolve_schema = spark.conf.get("evolve_schema")
mode = spark.conf.get("mode")
import sys
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")
# Import necessary modules
from utils.logging_handling import get_logger, log_event, log_error
from utils.dlt_utils import create_dlt_table
from core.model_extensions import apply_model_extensions
from pyspark.sql import SparkSession
# Initialize logger and Spark session
logger = get_logger()
log_event(logger, "Starting file ingestion process.", "INFO")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"
filtered_elements = elements.filter(elements["Source_feed_name"] == "BB1123_loans").orderBy("Position")
# Define schema using elements metadata
schema = StructType([
StructField(
row["Entity_Technical_Name"], # Adjusted to match column key
map_data_type(row["Datatype"], row["Length"] if "Length" in row else None),
row["Nullable"] == "yes"
)
for row in filtered_elements.filter(elements["Source_feed_name"] == "BB1123_loans").collect()
])
log_event(logger, f"Schema: {schema}", "INFO")
# @Dlt.table(name ="test")
# def test_table():
# return spark.read.table(table_path)
header="true"
evolve_schema=True
schema_hints = None
if schema and evolve_schema:
schema_hints = ",".join([f"{field.name} {field.dataType.simpleString()}" for field in schema.fields])
# schema_hints = {field.name: field.dataType.simpleString() for field in schema.fields}
log_event(logger, f"Schema hints: {schema_hints}", "INFO")
# Determine schema evolution mode based on header and evolve_schema
if header == "true":
infer_column_types = "true" # Infer schema from file
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
else:
infer_column_types = "false" # Do not infer schema
schema_evolution_mode = "addNewColumns" if evolve_schema else "rescue"
# Define the cloud file configuration
cloudfile = {
"cloudFiles.format": file_format,
"cloudFiles.allowOverwrites": "false",
"cloudFiles.inferColumnTypes": infer_column_types,
"cloudFiles.schemaLocation": f"{data_file_path}/_schema/{table_name}/",
"cloudFiles.schemaEvolutionMode": schema_evolution_mode,
"cloudFiles.schemaHints": schema_hints,
"cloudFiles.useNotifications": "false",
"cloudFiles.includeExistingFiles": "false"
}
file_specific_config = {
"header": header, # Pass the header option directly
"delimiter": "|", # Set pipe as the delimiter
"quote": "\"", # Handle quoted fields
"escape": "\\" # Use backslash as escape character
}
table_type = "Streaming"
@dlt.append_flow(
target = target_table_name,
name = f"{target_table_name}_{file_format}_ingestion",
# name = f"{target_table_name}",
comment = f"{mode} Mode {table_type} DLT table created/updated from {data_file_path}."
)
def source_to_bronze():
input_df = (spark
.readStream
.format("cloudfiles")
.options(**cloudfile)
.options(**file_specific_config)
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
.load(data_file_path))
# return df.writeStream.trigger(availableNow=True if mode == "batch" else False)
input_df = apply_model_extensions(input_df,target_catalog,target_schema,target_table_name)
log_event(logger, f"Inferred columns: {input_df.columns}", "INFO")
# Write to Delta
input_df.writeStream \
.format("delta") \
.option("mergeSchema", "true") \
.outputMode("append") \
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}") \
.start(table_path)
return input_df
log_event(logger, f"DLT {table_type} table {table_name} created or updated from input path.", "INFO")
Code block 3: Doing some post inegstion checks using append_flow again. I want to use expectations as well on this block but cannot use bcz expectations not allowed in append_flow. (Plz provide a solution or idea for this as well)
import sys
from delta.tables import DeltaTable
from pyspark.sql.functions import expr, lit, count
# Add your module paths
sys.path.append(f"/Workspace/Users/{user}/.bundle/wtd_ingestion_framework/dev/files/src/")
# Import necessary modules
from utils.logging_handling import get_logger, log_event
from utils.dlt_utils import create_dlt_table
from utils.helper import get_rules, get_primary_key
# Initialize logger
logger = get_logger()
log_event(logger, "Started Post Ingestion Checks", "INFO")
# Get table details
target_table_name = spark.conf.get("target_table_name")
target_catalog = spark.conf.get("catalog")
table_path = f"{target_catalog}.{target_schema}.{target_table_name}"
log_event(logger, f"DLT table {table_path}", "INFO")
# @Dlt.expect_all(get_rules(table_path))
@dlt.append_flow(
target=f"{target_table_name}",
name = f"{target_table_name}_post_ingestion_checks"
)
def validation_table():
# Read the source Delta table
record_count = spark.read.table(f"LIVE.{table_path}").count()
log_event(logger, f"Record count: {record_count}", "INFO")
df = spark.readStream.table(f"LIVE.{table_path}")
# df.withColumn("record_count", lit(record_count))
# Add quarantine flag using the dynamic record_count
rules = get_rules(table_path)
log_event(logger, f"Quarantine rules: {list(rules.values())}", "INFO")
if len(rules) == 1:
quarantine_rules = f"NOT({list(rules.values())[0]})"
else:
quarantine_rules = f"NOT({' OR '.join(rules.values())})"
log_event(logger, f"Quarantine rules applied: {quarantine_rules}", "INFO")
df_quarantined = df.withColumn("record_count", lit(record_count)).withColumn("Quarantined_flag", expr(quarantine_rules))
df_quarantined = df_quarantined.drop("record_count")
# Write the updated DataFrame
query = (
df_quarantined.writeStream.format("delta")
.option("mergeSchema", "true")
.option("checkpointLocation", f"{data_file_path}/_checkpoints/{target_table_name}")
.outputMode("append")
.start(table_path)
)
Getting this error
py4j.protocol.Py4JJavaError: Traceback (most recent call last):
File "/Workspace/Users/ashraf@wtdanalytics.com/.bundle/wtd_ingestion_framework/dev/files/src/notebooks/load_metadata/task_schema_changes", cell 6, line 31, in validation_table
record_count = spark.read.table(f"LIVE.{table_path}").count()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
py4j.protocol.Py4JJavaError: An error occurred while calling z:com.databricks.pipelines.Pipeline.readBatchInput.
: com.databricks.pipelines.common.errors.DLTSparkException: [DATASET_NOT_DEFINED] Failed to read dataset 'accounts_and_customer.bronze.bb1123_loans'. This dataset is not defined in the pipeline.
If this table is managed by another pipeline, then do not use `dlt.read` / `dlt.readStream` to read the table or prepend the name with the LIVE keyword.
Sunday
Hi @ashraf1395,
The error you are encountering, py4j.protocol.Py4JJavaError: An error occurred while calling z:com.databricks.pipelines.Pipeline.readBatchInput. : com.databricks.pipelines.common.errors.DLTSparkException: [DATASET_NOT_DEFINED] Failed to read dataset 'accounts_and_customer.bronze.bb1123_loans'. This dataset is not defined in the pipeline., indicates that the dataset accounts_and_customer.bronze.bb1123_loans is not defined within the Delta Live Tables (DLT) pipeline.
Here are a few steps to troubleshoot this issue:
import dlt
@dlt.table
def bb1123_loans():
return spark.read.format("delta").load("/path/to/your/dataset")
Use the Correct Dataset Name: When referencing datasets within the same pipeline, prepend the dataset name with LIVE. For example
df = spark.readStream.table("LIVE.bb1123_loans")
Pipeline Configuration: Ensure that the pipeline configuration includes the necessary datasets. If the dataset is managed by another pipeline, you should not use dlt.read or dlt.readStream to read the table. Instead, use spark.read.table or spark.readStream.table with the appropriate table name
yesterday
yesterday
Cant we use Live.table_name on a target dlt table with @Dlt.append_flow decorator.
If yes can you share the code bcz when I tried I am getting error.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group