Friday
I'm trying to create a pipeline containing a view from a federated source. In this case, I'd like to just create materialized views from the federation and and schedule the pipeline for execution. If I define a pipeline with only something like the following:
I get the following error:
com.databricks.pipelines.execution.core.ExecutionFailedException: [DLT ERROR CODE: NO_TABLES_IN_PIPELINE] No tables are defined by the libraries of this pipeline. This error usually occurs when flows defined without defining the table they target, or when all top-level definitions are views.
Is it not possible to create a pipeline with just views or is there some other way I should be doing this?
Saturday
Hi @aliacovella,
That is correct:
CDC with Delta Live Tables: The apply_changes
API is designed to simplify CDC with Delta Live Tables by processing changes from a change data feed (CDF). This API requires a streaming table to function.
Streaming Table Requirement: Since streaming tables are not supported from JDBC sources, you cannot directly use a Postgres database as a streaming source for the apply_changes
API.
Alternative Streaming Sources: To implement CDC, you would need to use a streaming source that is supported by Databricks, such as Amazon Kinesis. This would involve setting up a Kinesis stream to capture changes from your Postgres database and then using this stream as the source for your Delta Live Tables pipeline.
Configuration and Metrics: When using Kinesis, you can configure various options such as maxFetchDuration
and minFetchPeriod
to optimize the streaming query performance. Additionally, Kinesis provides metrics like avgMsBehindLatest
, maxMsBehindLatest
, and minMsBehindLatest
to monitor the streaming query's progress.
Sunday
Friday
Hi @aliacovella,
The error message you received indicates that no tables are defined by the libraries of the pipeline, which typically occurs when all top-level definitions are views.
In Delta Live Tables, a pipeline must include at least one table definition. Views alone are not sufficient to define a pipeline. This is because views in DLT are meant to be derived from tables, and the pipeline needs at least one table to anchor the transformations.
To resolve this issue, you can define a table in your pipeline alongside your views. Here is an example of how you can modify your pipeline to include a table definition:
import dlt
from pyspark.sql.functions import *
# Define a table
@dlt.table
def users_table():
return spark.read.table("some_catalog.public.users")
# Define a view based on the table
@dlt.view
def users_view():
return dlt.read("users_table")
In this example, users_table is defined as a table, and users_view is a view that references the users_table. This ensures that your pipeline has at least one table definition, which should resolve the error you encountered.
Saturday
Thanks, that makes sense. So if I create a table against a federated data source, when changes occur in the source table, does it automatically handle the change data capture or does it perform a table scan on the federated source to determine changes and update the DLT?
Saturday
From what I can see, it looks like it queries all the records again. I tried looking into the apply changes API, but that seems to require a streaming table, and it appears that streaming tables are not supported from jdbc sources and in this case it happens to be a Postgres database. Is the only choice to support CDC to stream from something like kinesis in this case?
Saturday
Hi @aliacovella,
That is correct:
CDC with Delta Live Tables: The apply_changes
API is designed to simplify CDC with Delta Live Tables by processing changes from a change data feed (CDF). This API requires a streaming table to function.
Streaming Table Requirement: Since streaming tables are not supported from JDBC sources, you cannot directly use a Postgres database as a streaming source for the apply_changes
API.
Alternative Streaming Sources: To implement CDC, you would need to use a streaming source that is supported by Databricks, such as Amazon Kinesis. This would involve setting up a Kinesis stream to capture changes from your Postgres database and then using this stream as the source for your Delta Live Tables pipeline.
Configuration and Metrics: When using Kinesis, you can configure various options such as maxFetchDuration
and minFetchPeriod
to optimize the streaming query performance. Additionally, Kinesis provides metrics like avgMsBehindLatest
, maxMsBehindLatest
, and minMsBehindLatest
to monitor the streaming query's progress.
Saturday
Informative.
Sunday
Thanks, I originally tried the Kinesis route, and it worked well but was hoping there was a simpler solution. I'll go with that then.
Sunday
No problem, if you have any other questions let me know!
yesterday
Thanks, I do have one more question. In the following scenario:
A new migration via AWS database migration service has been configured with the CDC going to Kinesis
A new DLT has been configured to read from the Kinesis stream
An initial load of potentially millions of records needs to occur
Is the recommended approach to just allow the DLT process the entire stream, or is there a more efficient approach that I should consider.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group