cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Query separate data loads from python spark.readStream

jb1z
Contributor

I am using python spark.readStream in a Delta Live Tables pipeline to read json data files from a S3 folder path. Each load is a daily snapshot of a very similar set of products showing changes in price and inventory. How do i distinguish and query each daily load of json products? 

 

import dlt
from datetime import datetime
folder_date = datetime.today().strftime('%Y-%m-%d')
@dlt.table(table_properties={'quality': 'bronze', 'delta.columnMapping.mode': 'name', 'delta.minReaderVersion': '2', 'delta.minWriterVersion': '5'})
def items_inventory_price():
  return (
     spark.readStream.format('cloudFiles')
     .option('cloudFiles.format', 'json')
     .option('delta.columnMapping.mode', 'name')
     .load(f's3://bucket/inventory/Item/{folder_date}')
    )

 

I was looking at `DESCRIBE HISTORY items_inventory_price` to use versions but these are not supported in Streaming Tables, the message is suggesting to switch to SQL warehouse. 

If I was able to add a date column to each data load I would be able to separate each load, or there may be metadata that i can use?

1 ACCEPTED SOLUTION

Accepted Solutions

jb1z
Contributor

The problem was fixed by this import

from pyspark.sql import functions as F
 
then using F.lit() instead of F.col
.withColumn('ingestion_date', F.lit(folder_date))
 
Sorry code formatting is not working at the moment.

View solution in original post

5 REPLIES 5

Alberto_Umana
Databricks Employee
Databricks Employee

Hi @jb1z,

You can use the withColumn method to add a date column to your DataFrame. This column will store the date when the data was loaded and update the items_inventory_price function to include the date column

 

 

To be clear, i want to add a new date column to so i can query the daily loads of inventory and product. I don't want to modify an existing column.

jb1z
Contributor

Thank you @Alberto_Umana for your response. The error message also mentioned a shared cluster. I was able to get access to `describe history` by changing Access Mode = Shared from Single User in the Compute configuration.

jb1z
Contributor

The community forum is making my Reply post disappear after I post, i have made 5 attempts.

I tried using .withColumn('ingestion_date', functions.col(folder_date)), after .load() but i am getting the error AnalysisException ... a column or function param cannot be resolved.

jb1z
Contributor

The problem was fixed by this import

from pyspark.sql import functions as F
 
then using F.lit() instead of F.col
.withColumn('ingestion_date', F.lit(folder_date))
 
Sorry code formatting is not working at the moment.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group