cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT target schema - get value during run time

GuMart
New Contributor III

Hi,

I would like to know if it is possible to get the target schema, programmatically, inside a DLT.

In DLT pipeline settings, destination, target schema.

I want to run more idempotent pipelines. For example, my target table has the fields: reference_date, customer_id, val1, val2, etc

I would like to, BEFORE writing to it, get the max reference_date currently in the target table, so I can query appropriately the source tables.

Currently DLT does not allow to create another live table reading to it before.

I could use 'spark.table', but to make it fully automatic, I would like to have the target schema.

target_table_name = 'whatevertable'
 
@dlt.table(
     name = target_table_name
)
def update_target():
 
     max_date = (
            spark
            .table(get_dlt_target_schema() + '.' + target_table_name)
            .select(F.max('reference_date'))
    )
 
    ....
    ....
    ....

Thank you

1 ACCEPTED SOLUTION

Accepted Solutions

Anonymous
Not applicable

@Gustavo Martins​ :

I see, thank you for the clarification. In that case, you can use the spark.conf.get("spark.sql.warehouse.dir") method to get the location of the default database directory, which is typically the root directory of the Databricks File System (DBFS). From there, you can construct the path to the table's metadata file, which includes the schema information.

Here's an example:

target_table_name = 'whatevertable'
 
@dlt.table(
     name=target_table_name
)
def update_target():
    dbfs_root = spark.conf.get("spark.sql.warehouse.dir")
    target_table_path = f"{dbfs_root}/{target_table_name}"
    target_table_metadata = spark.read.format("parquet").load(f"{target_table_path}/_metadata")
    target_table_schema = target_table_metadata.schema.simpleString()
 
    max_date = (
            spark
            .table(f"`{target_table_schema}`.`{target_table_name}`")
            .select(F.max('reference_date'))
    )
 
    ...

This code reads the _metadata file of the target table to get its schema information, which is then used to construct the full target table path. You can then use the full schema-qualified table name to access the table with the

spark.table() method.

Note that the spark.sql.warehouse.dir configuration property may not be set in some environments, in which case you may need to manually specify the root directory of your default database.

View solution in original post

5 REPLIES 5

Kaniz
Community Manager
Community Manager

Hi @Gustavo Martins​, It's possible to get the target schema programmatically in a DLT notebook by reading the Delta table using PySpark.

Here's a way to achieve this by defining a function that gets the target schema from the target Delta table:

from pyspark.sql import SparkSession
 
spark = SparkSession.builder.getOrCreate()
 
def get_dlt_target_schema(target_table_name):
    # Read the target Delta table
    target_table_df = spark.read.format("delta").load(target_table_name)
 
    # Get the schema of the target table
    target_schema = target_table_df.schema
 
    return target_schema

Then, you can use this function in your update_target() function:

from pyspark.sql.functions import max as F_max
 
target_table_name = 'whatevertable'
 
@dlt.table(
    name=target_table_name
)
def update_target():
    target_schema = get_dlt_target_schema(target_table_name)
 
    max_date = (
        spark
        .table(target_table_name)
        .select(F_max('reference_date'))
    )
 
    # ...

However, there is a limitation to this approach. Since you are trying to read the same table you are defining using the @dlt.table decorator, you might run into issues when trying to read and write the same table within the same pipeline.

A possible workaround for this is to use a different approach to query the maximum reference date from the target table outside of the DLT pipeline. You can read the target table using PySpark or a different method and pass the max reference date as a parameter to the DLT pipeline.

This way, you can avoid reading and writing the same table within the same pipeline.

Remember to replace the 'whatevertable' placeholder with the appropriate path to your Delta table

GuMart
New Contributor III

Hi @Kaniz Fatma​ ,

Thank you for your reply.

About the limitations, I know, that is why I want to read a table with 'spark.table()' method.

And to be clear, what I mean with 'schema' is:

hive_metastore.<this_schema>.<the_target_table>

But what you suggested, using 'spark.table(<the_target_table>)' won't work, because if I don't use the full name (i.e., <this_schema>.<the_target_table>) it will assume the default schema, which is not the target schema (<this_schema>).

Because I could use the function 'current_database()', but again, this will return the default schema, and not the 'target schema for this DLT'.

Anonymous
Not applicable

@Gustavo Martins​ :

I see, thank you for the clarification. In that case, you can use the spark.conf.get("spark.sql.warehouse.dir") method to get the location of the default database directory, which is typically the root directory of the Databricks File System (DBFS). From there, you can construct the path to the table's metadata file, which includes the schema information.

Here's an example:

target_table_name = 'whatevertable'
 
@dlt.table(
     name=target_table_name
)
def update_target():
    dbfs_root = spark.conf.get("spark.sql.warehouse.dir")
    target_table_path = f"{dbfs_root}/{target_table_name}"
    target_table_metadata = spark.read.format("parquet").load(f"{target_table_path}/_metadata")
    target_table_schema = target_table_metadata.schema.simpleString()
 
    max_date = (
            spark
            .table(f"`{target_table_schema}`.`{target_table_name}`")
            .select(F.max('reference_date'))
    )
 
    ...

This code reads the _metadata file of the target table to get its schema information, which is then used to construct the full target table path. You can then use the full schema-qualified table name to access the table with the

spark.table() method.

Note that the spark.sql.warehouse.dir configuration property may not be set in some environments, in which case you may need to manually specify the root directory of your default database.

GuMart
New Contributor III

Thank you @Suteja Kanuri​ ,

Looks like you solution is working, thank you.

Regards,

Anonymous
Not applicable

Perfect! I am glad this worked out for you @Gustavo Martins​ .

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.