- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-15-2025 09:59 PM
I am using python spark.readStream in a Delta Live Tables pipeline to read json data files from a S3 folder path. Each load is a daily snapshot of a very similar set of products showing changes in price and inventory. How do i distinguish and query each daily load of json products?
import dlt
from datetime import datetime
folder_date = datetime.today().strftime('%Y-%m-%d')
@dlt.table(table_properties={'quality': 'bronze', 'delta.columnMapping.mode': 'name', 'delta.minReaderVersion': '2', 'delta.minWriterVersion': '5'})
def items_inventory_price():
return (
spark.readStream.format('cloudFiles')
.option('cloudFiles.format', 'json')
.option('delta.columnMapping.mode', 'name')
.load(f's3://bucket/inventory/Item/{folder_date}')
)
I was looking at `DESCRIBE HISTORY items_inventory_price` to use versions but these are not supported in Streaming Tables, the message is suggesting to switch to SQL warehouse.
If I was able to add a date column to each data load I would be able to separate each load, or there may be metadata that i can use?
Accepted Solutions
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 11:45 PM
The problem was fixed by this import
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 04:33 AM
Hi @jb1z,
You can use the withColumn
method to add a date column to your DataFrame. This column will store the date when the data was loaded and update the items_inventory_price
function to include the date column
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 03:42 PM
To be clear, i want to add a new date column to so i can query the daily loads of inventory and product. I don't want to modify an existing column.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 01:02 PM
Thank you @Alberto_Umana for your response. The error message also mentioned a shared cluster. I was able to get access to `describe history` by changing Access Mode = Shared from Single User in the Compute configuration.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 07:25 PM
The community forum is making my Reply post disappear after I post, i have made 5 attempts.
I tried using .withColumn('ingestion_date', functions.col(folder_date)), after .load() but i am getting the error AnalysisException ... a column or function param cannot be resolved.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
01-16-2025 11:45 PM
The problem was fixed by this import

