cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Query to calculate cost of task from each job by day

dndeng
New Contributor II

I am trying to find the cost per Task in each Job every time it was executed (daily) but currently getting very huge numbers due to duplicates, can someone help me ?

 

 

 

WITH workspace AS (
  SELECT
    account_id,
    workspace_id,
    workspace_name,
    workspace_url,
    status,
    workspace_name
  FROM system.access.workspaces_latest
),
usage_with_ws_filtered_by_date AS (
  SELECT
    u.*,
    w.workspace_name,
    w.workspace_url,
    w.workspace_full_name
  FROM system.billing.usage u
  INNER JOIN workspace w ON u.workspace_id = w.workspace_id
  WHERE u.billing_origin_product = 'JOBS'
    AND u.usage_date BETWEEN DATE_ADD(CURRENT_DATE(), -30) AND CURRENT_DATE()
),
task_usage AS (
  SELECT
    u.workspace_id,
    u.workspace_name,
    u.workspace_url,
    u.workspace_full_name,
    u.usage_metadata.job_id,
    u.usage_metadata.job_run_id AS run_id,
    t.task_key,
    t.change_time AS task_change_time,
    u.usage_start_time,
    u.usage_end_time,
    u.usage_quantity,
    u.sku_name,
    u.identity_metadata.run_as AS user,
    u.usage_date
  FROM usage_with_ws_filtered_by_date u
  INNER JOIN system.lakeflow.job_tasks t
    ON u.workspace_id = t.workspace_id
    AND u.usage_metadata.job_id = t.job_id
),
task_costs AS (
  SELECT
    tu.*,
    lp.pricing.default AS unit_price,
    tu.usage_quantity * lp.pricing.default AS cost
  FROM task_usage tu
  LEFT JOIN system.billing.list_prices lp
    ON tu.sku_name = lp.sku_name
    AND tu.usage_start_time >= lp.price_start_time
    AND (tu.usage_end_time <= lp.price_end_time OR lp.price_end_time IS NULL)
    AND lp.currency_code = 'USD'
)
SELECT
  tc.workspace_id,
  tc.workspace_name,
  tc.workspace_url,
  tc.user,
  tc.job_id,
  tc.run_id,
  tc.task_key,
  tc.task_change_time,
  tc.usage_start_time,
  tc.usage_end_time,
  tc.usage_date,
  SUM(tc.cost) AS total_cost
FROM task_costs tc
GROUP BY
  tc.workspace_id, tc.workspace_name, tc.workspace_url, tc.user, tc.job_id, tc.run_id,
  tc.task_key, tc.task_change_time, tc.usage_start_time, tc.usage_end_time, tc.usage_date
ORDER BY
  tc.usage_date DESC, tc.workspace_id, tc.job_id, tc.run_id, tc.task_key
 

 

 

4 REPLIES 4

LynDCode
New Contributor II

Can you explain more the nature of the duplicates you are getting?

nayan_wylde
Esteemed Contributor

It seems the duplicates are caused by the task_change_time from the job_tasks table. Even though the table definition shows task_change_time is the time last time the task was modifed.. But it is capturing different times and it is SCD type 2 table. I updated the query. Can you please use this query. probably you can raise a Databricks ticket to see the reason to different  task_change_time eventhough the task is not updated.

WITH workspace AS (
  SELECT
    account_id,
    workspace_id,
    workspace_name,
    workspace_url,
    status,
    workspace_name
  FROM system.access.workspaces_latest
),
usage_with_ws_filtered_by_date AS (
  SELECT
    u.*,
    w.workspace_name,
    w.workspace_url
  FROM system.billing.usage u
  INNER JOIN workspace w ON u.workspace_id = w.workspace_id
  WHERE u.billing_origin_product = 'JOBS'
    AND u.usage_date BETWEEN DATE_ADD(CURRENT_DATE(), -30) AND CURRENT_DATE()
),
task_usage AS (
  SELECT
    u.workspace_id,
    u.workspace_name,
    u.workspace_url,
    u.usage_metadata.job_id,
    u.usage_metadata.job_run_id AS run_id,
    t.task_key,
    t.change_time AS task_change_time,
    u.usage_start_time,
    u.usage_end_time,
    u.usage_quantity,
    u.sku_name,
    u.identity_metadata.run_as AS user,
    u.usage_date
  FROM usage_with_ws_filtered_by_date u
  INNER JOIN system.lakeflow.job_tasks t
    ON u.workspace_id = t.workspace_id
    AND u.usage_metadata.job_id = t.job_id
),
task_costs AS (
  SELECT
    tu.*,
    lp.pricing.default AS unit_price,
    tu.usage_quantity * lp.pricing.default AS cost
  FROM task_usage tu
  LEFT JOIN system.billing.list_prices lp
    ON tu.sku_name = lp.sku_name
    AND tu.usage_start_time >= lp.price_start_time
    AND (tu.usage_end_time <= lp.price_end_time OR lp.price_end_time IS NULL)
    AND lp.currency_code = 'USD'
)
SELECT DISTINCT
  tc.workspace_id,
  tc.workspace_name,
  tc.workspace_url,
  tc.user,
  tc.job_id,
  tc.run_id,
  tc.task_key,
  tc.usage_start_time,
  tc.usage_end_time,
  tc.usage_date,
  SUM(tc.cost) AS total_cost
FROM task_costs tc
GROUP BY
  tc.workspace_id, tc.workspace_name, tc.workspace_url, tc.user, tc.job_id, tc.run_id,
  tc.task_key, tc.usage_start_time, tc.usage_end_time, tc.usage_date
ORDER BY
  tc.usage_date DESC, tc.workspace_id, tc.job_id, tc.run_id, tc.task_key

 

dndeng
New Contributor II

still costs exploded it seems there is no way to get cost per task only per job.

mark_ott
Databricks Employee
Databricks Employee

You are seeing inflated cost numbers because your query groups by many columns—especially run_id, task_key, usage_start_time, and usage_end_time—without addressing possible duplicate row entries that arise from your joins, especially with the system.lakeflow.job_tasks and system.billing.usage tables. This can lead to double-counting usage or cost data for the same task execution.

Key Issues

  • Duplicate task-cost entries: If system.billing.usage or system.lakeflow.job_tasks records have multiple rows per task execution (for example, due to granular SKU or pricing variations, or logs with multiple SKUs per run), SUM(tc.cost) aggregates duplicates rather than computing the true cost per task per job per day.

  • Grouping by too many detail columns: Including both run_id and precise timing blocks the aggregation at the desired daily level.

To compute daily unique cost per task per job, perform aggregation at the correct level (task, job, day) and deduplicate usage with a window or distinct operation.

Example Solution

Below is a refactored query that:

  • Deduplicates task usage per workspace, job, run, task, and day.

  • Aggregates cost per unique task execution per day.

sql
WITH workspace AS ( SELECT account_id, workspace_id, workspace_name, workspace_url, status FROM system.access.workspaces_latest ), usage_with_ws_filtered_by_date AS ( SELECT u.*, w.workspace_name, w.workspace_url FROM system.billing.usage u INNER JOIN workspace w ON u.workspace_id = w.workspace_id WHERE u.billing_origin_product = 'JOBS' AND u.usage_date BETWEEN DATE_ADD(CURRENT_DATE(), -30) AND CURRENT_DATE() ), task_usage AS ( SELECT u.workspace_id, u.workspace_name, u.workspace_url, u.usage_metadata.job_id, u.usage_metadata.job_run_id AS run_id, t.task_key, u.usage_start_time, u.usage_end_time, u.usage_quantity, u.sku_name, u.identity_metadata.run_as AS user, u.usage_date FROM usage_with_ws_filtered_by_date u INNER JOIN system.lakeflow.job_tasks t ON u.workspace_id = t.workspace_id AND u.usage_metadata.job_id = t.job_id AND u.usage_metadata.job_run_id = t.job_run_id -- Ensures exact match per run ), task_costs AS ( SELECT tu.*, lp.pricing.default AS unit_price, tu.usage_quantity * lp.pricing.default AS cost FROM task_usage tu LEFT JOIN system.billing.list_prices lp ON tu.sku_name = lp.sku_name AND tu.usage_start_time >= lp.price_start_time AND (tu.usage_end_time <= lp.price_end_time OR lp.price_end_time IS NULL) AND lp.currency_code = 'USD' ), deduped_task_costs AS ( SELECT workspace_id, workspace_name, workspace_url, user, job_id, run_id, task_key, usage_date, SUM(cost) AS daily_task_cost FROM task_costs GROUP BY workspace_id, workspace_name, workspace_url, user, job_id, run_id, task_key, usage_date ) SELECT * FROM deduped_task_costs ORDER BY usage_date DESC, workspace_id, job_id, run_id, task_key;

Key changes:

  • The final grouping is at the [workspace, job, run, task, day] level.

  • No grouping by highly granular fields (e.g., usage_start_time) that cause duplicates.

  • Adds a join condition on job_run_id that helps deduplicate runs.

Next Steps

  • If you still see large cost numbers, check your raw data for duplicates (multiple usage SKUs for one task/run).

  • If necessary, aggregate at a higher level—such as per task per job per workspace per day (without grouping by run_id)—if each run represents duplicate usage for the same logical task.

You may need to adjust the grouping to your real business definition of “cost per Task in each Job every time it was executed (daily),” depending on how you want to attribute runs and reruns. Let this structure be your base to tune further.