- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-18-2023 06:34 AM
Hi,
I would like to know if it is possible to get the target schema, programmatically, inside a DLT.
In DLT pipeline settings, destination, target schema.
I want to run more idempotent pipelines. For example, my target table has the fields: reference_date, customer_id, val1, val2, etc
I would like to, BEFORE writing to it, get the max reference_date currently in the target table, so I can query appropriately the source tables.
Currently DLT does not allow to create another live table reading to it before.
I could use 'spark.table', but to make it fully automatic, I would like to have the target schema.
target_table_name = 'whatevertable'
@dlt.table(
name = target_table_name
)
def update_target():
max_date = (
spark
.table(get_dlt_target_schema() + '.' + target_table_name)
.select(F.max('reference_date'))
)
....
....
....
Thank you
- Labels:
-
DLT
-
DLTSchema
-
Target Schema
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-24-2023 08:56 PM
@Gustavo Martins :
I see, thank you for the clarification. In that case, you can use the spark.conf.get("spark.sql.warehouse.dir") method to get the location of the default database directory, which is typically the root directory of the Databricks File System (DBFS). From there, you can construct the path to the table's metadata file, which includes the schema information.
Here's an example:
target_table_name = 'whatevertable'
@dlt.table(
name=target_table_name
)
def update_target():
dbfs_root = spark.conf.get("spark.sql.warehouse.dir")
target_table_path = f"{dbfs_root}/{target_table_name}"
target_table_metadata = spark.read.format("parquet").load(f"{target_table_path}/_metadata")
target_table_schema = target_table_metadata.schema.simpleString()
max_date = (
spark
.table(f"`{target_table_schema}`.`{target_table_name}`")
.select(F.max('reference_date'))
)
...
This code reads the _metadata file of the target table to get its schema information, which is then used to construct the full target table path. You can then use the full schema-qualified table name to access the table with the
spark.table() method.
Note that the spark.sql.warehouse.dir configuration property may not be set in some environments, in which case you may need to manually specify the root directory of your default database.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-19-2023 11:17 PM
Hi @Kaniz Fatma ,
Thank you for your reply.
About the limitations, I know, that is why I want to read a table with 'spark.table()' method.
And to be clear, what I mean with 'schema' is:
hive_metastore.<this_schema>.<the_target_table>
But what you suggested, using 'spark.table(<the_target_table>)' won't work, because if I don't use the full name (i.e., <this_schema>.<the_target_table>) it will assume the default schema, which is not the target schema (<this_schema>).
Because I could use the function 'current_database()', but again, this will return the default schema, and not the 'target schema for this DLT'.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-24-2023 08:56 PM
@Gustavo Martins :
I see, thank you for the clarification. In that case, you can use the spark.conf.get("spark.sql.warehouse.dir") method to get the location of the default database directory, which is typically the root directory of the Databricks File System (DBFS). From there, you can construct the path to the table's metadata file, which includes the schema information.
Here's an example:
target_table_name = 'whatevertable'
@dlt.table(
name=target_table_name
)
def update_target():
dbfs_root = spark.conf.get("spark.sql.warehouse.dir")
target_table_path = f"{dbfs_root}/{target_table_name}"
target_table_metadata = spark.read.format("parquet").load(f"{target_table_path}/_metadata")
target_table_schema = target_table_metadata.schema.simpleString()
max_date = (
spark
.table(f"`{target_table_schema}`.`{target_table_name}`")
.select(F.max('reference_date'))
)
...
This code reads the _metadata file of the target table to get its schema information, which is then used to construct the full target table path. You can then use the full schema-qualified table name to access the table with the
spark.table() method.
Note that the spark.sql.warehouse.dir configuration property may not be set in some environments, in which case you may need to manually specify the root directory of your default database.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-27-2023 10:10 AM
Thank you @Suteja Kanuri ,
Looks like you solution is working, thank you.
Regards,
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
04-27-2023 11:24 AM
Perfect! I am glad this worked out for you @Gustavo Martins .