cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

sampleBy stream in DLT

lprevost
Contributor

I would like to create a sampleBy (stratified version of sample) copy/clone of my delta table.   Ideally, I'd like to do this using a DLT.    

 

My source table grows incrementally each month as batch files are added and autoloader picks them up.   

 

Ideally, I would like to add a new layer of sampled as it grows incrementally rather than have to regenerate the whole table (very large source table).   But, can't seem to find a way to do this with a stream

1 REPLY 1

Sidhant07
Databricks Employee
Databricks Employee

You can create a stratified sample of your delta table using the `sampleBy` function in Databricks. However, DLT  does not support the `sampleBy` function directly. To work around this, you can create a notebook that uses the `sampleBy` function to create a stratified sample of your delta table and then use DLT to schedule and run the notebook.

To create a stratified sample of your delta table that grows incrementally, you can use the following approach:

1. Create a new delta table that will store the stratified sample.
2. Use the `sampleBy` function to create an initial stratified sample of the source table.
3. Write the sampled data to the new delta table.
4. Create a streaming query that reads new data from the source table using Auto Loader and applies the `sampleBy` function to the new data.
5. Write the sampled data to the new delta table.

Here's some sample code that demonstrates this approach:
```python
# Create a new delta table for the stratified sample
spark.sql("CREATE TABLE sampled_table USING DELTA LOCATION '/path/to/sample/table'")

# Create an initial stratified sample of the source table
sampled_data = spark.read.table("source_table").sampleBy("strata_column", fractions={"strata_value1": 0.1, "strata_value2": 0.2})

# Write the sampled data to the new delta table
sampled_data.write.format("delta").save("/path/to/sample/table")

# Create a streaming query that reads new data from the source table using Auto Loader
streaming_query = (spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "parquet")
.option("cloudFiles.schemaLocation", "/path/to/schema/location")
.load("/path/to/source/table")
.createOrReplaceTempView("streaming_data"))

# Apply the sampleBy function to the new data and write it to the new delta table
spark.sql("""
SELECT * FROM streaming_data TABLESAMPLE(sampleBy('strata_column', fractions={"strata_value1": 0.1, "strata_value2": 0.2}))
""").writeStream
.format("delta")
.option("checkpointLocation", "/path/to/checkpoint/location")
.start("/path/to/sample/table")
```
In this example, replace `/path/to/source/table` with the path to your source delta table, `/path/to/sample/table` with the path to the new delta table for the stratified sample, and `/path/to/schema/location` with the path to the schema location for Auto Loader. You can also adjust the `fractions` parameter in the `sampleBy` function to specify the desired sampling fractions for each strata value.

Note that this approach assumes that the new data added to the source table has the same schema as the existing data. If the schema changes, you may need to update the schema location for Auto Loader and/or modify the streaming query to handle the schema changes.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group