"approxQuantile" not working as part of a delta live table workflow pipeline.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 09:01 AM
I am trying to compute outliers using approxQuantile on a Dataframe. It works fine in a Databricks notebook, but the call doesn't work correctly when it is part of a delta live table pipeline. (This is python.) Here is the line that isn't working as part of the pipeline.
quantiles = session_agg_df.approxQuantile('duration_minutes', [0.25, 0.75], 0)
Q1 = quantiles[0] # <-- Fails as part of dlt pipeline, but works in notebook
Q3 = quantiles[1] # <-- Fails as part of dlt pipeline, but works in notebook
The pipeline fails with "IndexError: list index out of range". Again, it works just fine in a notebook on the same data.
(I am trying to look for outliers in my pipeline using a very basic +/- 1.5*IQR approach.)
Also, if anybody has any suggestions on best practices for writing and debugging dlt pipelines, I'd love to hear them!
Thanks,
Miles Porter
Lead Data Scientist
Trimble, Inc.
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 09:09 AM
As it is returning a list, it can not work on the stream, especially with dlt, which is CREATE TABLE AS SELECT STATMENT
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 09:38 AM
Thanks for the quick answer, Hubert. Could you provide a bit more information? Is there a way to compute quantiles on columns in a dlt workflow?
Thanks!
Miles Porter
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 11:59 AM
try that
there is also Psypark Pandas function but haven't used Pandas on Spark with dlt
- Mark as New
- Bookmark
- Subscribe
- Mute
- Subscribe to RSS Feed
- Permalink
- Report Inappropriate Content
12-05-2022 12:24 PM
Good recommendation. I was able to do something similar that appears to work.
# Step 2. Compute the is outlier sessions based on duration_minutes
lc = session_agg_df.selectExpr("percentile(duration_minutes, 0.25) lower_quartile")
session_agg_df = session_agg_df.join(lc, how="outer")
uc = session_agg_df.selectExpr("percentile(duration_minutes, 0.75) upper_quartile")
session_agg_df = session_agg_df.join(uc, how="outer")
session_agg_df = session_agg_df.withColumn('iqr', session_agg_df['upper_quartile']-session_agg_df['lower_quartile'])
session_agg_df = session_agg_df.withColumn('lower_limit', session_agg_df['lower_quartile'] - (1.5 * session_agg_df['iqr']))
session_agg_df = session_agg_df.withColumn('upper_limit', session_agg_df['upper_quartile'] + (1.5 * session_agg_df['iqr']))
session_agg_df = session_agg_df.withColumn('is_outlier', f.when( (session_agg_df['duration_minutes']<session_agg_df['lower_limit']) | \
(session_agg_df['duration_minutes'] > session_agg_df['upper_limit']),1).otherwise(0))
I am sure there are more optimal ways of doing this, but the above does appear to flag the outliers (based on the IQR method) on my data. Posting incase anyone else gets stuck on this.
Miles

