cancel
Showing results for 
Search instead for 
Did you mean: 
Community Articles
Dive into a collaborative space where members like YOU can exchange knowledge, tips, and best practices. Join the conversation today and unlock a wealth of collective wisdom to enhance your experience and drive success.
cancel
Showing results for 
Search instead for 
Did you mean: 

Refresh PBI Dataset is consuming unnecessary compute

Debasis_Pal
New Contributor II

If I call powerbi refresh from databricks workflow job it waits until the refresh is complete unlike ADF(Azure Data Factory) pbi refresh through rest api. In case of big pbi dataset which might take more than a hour to refresh in databricks compute remains on for the whole timebeing as it waits until pbi refresh is complete.

Whereas in ADF it is a asynchronous rest api call which just hit refesh and doesn't wait.

What could be optimal solution to avoid unnecessary wait in databricks workflow job.

#powerbi #Databricksjob

1 ACCEPTED SOLUTION

Accepted Solutions

szymon_dybczak
Esteemed Contributor III

Hi @Debasis_Pal ,

The current Power BI task that is available in databricks workflow will wait for refresh process to return correct status (whether it succeeded or failed).

If you need to replicate the same behaviour as in ADF you can start refresh process by using asynchronous REST API call. The refresh process will start and your cluster can be terminated after that. I'm using this approach in one of the client I worked with.

https://learn.microsoft.com/en-us/power-bi/connect-data/asynchronous-refresh

 

import time
import requests
from azure.identity import ClientSecretCredential


_AUTHORITY = "https://login.microsoftonline.com/"
_PBI_SCOPE = "https://analysis.windows.net/powerbi/api/.default"
_REFRESHES_URL = "https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes"

_TERMINAL_STATUSES = {"Completed", "Failed", "Disabled"}
_POLL_INTERVAL_SECONDS = 120


class PowerBIRefreshError(Exception):
    pass


def _get_credential(dbutils, secret_scope, tenant_key, client_id_key, client_secret_key):
    return ClientSecretCredential(
        authority=_AUTHORITY,
        tenant_id=dbutils.secrets.get(scope=secret_scope, key=tenant_key),
        client_id=dbutils.secrets.get(scope=secret_scope, key=client_id_key),
        client_secret=dbutils.secrets.get(scope=secret_scope, key=client_secret_key),
    )


def _auth_headers(credential):
    token = credential.get_token(_PBI_SCOPE).token
    return {"Authorization": f"Bearer {token}"}


def _get_latest_refresh_status(url, credential):
    response = requests.get(url, headers=_auth_headers(credential))
    response.raise_for_status()

    refreshes = response.json().get("value", [])
    return refreshes[0]["status"] if refreshes else None


def _raise_for_pbi_status(response):
    messages = {
        400: "Bad Request: missing or malformed parameters.",
        401: "Unauthorized: authentication failed or insufficient permissions.",
        403: "Forbidden: authenticated user lacks access to the resource.",
        404: "Not Found: the requested resource does not exist.",
        500: "Internal Server Error: an error occurred on the Power BI service.",
    }
    message = messages.get(response.status_code)
    if message:
        raise PowerBIRefreshError(message)
    response.raise_for_status()


def refresh_power_bi_model(
    dbutils,
    secret_scope,
    secret_tenant_key,
    secret_client_id_key,
    secret_client_secret_key,
    workspace_id,
    dataset_id,
    apply_refresh_policy,
    commit_mode,
    poll_interval=_POLL_INTERVAL_SECONDS,
):
    """
    Triggers a Power BI dataset refresh and polls until it reaches a terminal state.

    Returns the terminal status string: "Completed", "Failed", or "Disabled".
    Raises PowerBIRefreshError on API errors or if a refresh is already in progress.
    """
    url = _REFRESHES_URL.format(workspace_id=workspace_id, dataset_id=dataset_id)
    credential = _get_credential(
        dbutils, secret_scope, secret_tenant_key, secret_client_id_key, secret_client_secret_key
    )

    if _get_latest_refresh_status(url, credential) == "InProgress":
        raise PowerBIRefreshError("Dataset is already refreshing. Cannot start a new refresh.")

    payload = {
        "type": "full",
        "commitMode": commit_mode,
        "maxParallelism": 6,
        "retryCount": 0,
        "applyRefreshPolicy": apply_refresh_policy,
    }
    response = requests.post(url, headers=_auth_headers(credential), json=payload)

    if not response.ok:
        _raise_for_pbi_status(response)

    while True:
        status = _get_latest_refresh_status(url, credential)
        if status in _TERMINAL_STATUSES:
            return status
        time.sleep(poll_interval)

 

If my answer was helpful, please consider marking it as accepted solution.

View solution in original post

2 REPLIES 2

Ashwin_DSA
Databricks Employee
Databricks Employee

Hi @Debasis_Pal,

This is expected behaviour with the Databricks Power BI task. If you enable "Refresh after update," Databricks doesn’t just publish the semantic model metadata... it also triggers the Power BI data refresh and waits for that step to finish, which is why the workflow can sit there holding compute for a long-running Import model refresh. The product docs call out that this setting is optional and only applies when you're using Import mode, so for very large datasets, a better pattern is usually to leave that option off in the Databricks job and trigger the Power BI refresh asynchronously from something lightweight like ADF, Logic Apps, or an Azure Function instead. The Databricks docs for the Power BI task are here: Power BI task for jobs.

If the goal is to avoid long refresh windows altogether, it’s also worth checking whether the model can move away from full Import refresh behaviour. Databricks documents that with DirectQuery, the data is not stored in Power BI, and queries are pushed to the SQL warehouse at query time, which is often a better fit when freshness matters more than imported-cache performance. There’s also broader guidance here on Power BI with Azure Databricks: Power BI with Azure Databricks.

The optimal approach is usually to decouple "publish/update the model" from "refresh the dataset." Let Databricks finish the data prep and model update, then let an external async orchestrator kick off the Power BI refresh so your Databricks compute is not waiting around for an hour just to monitor Power BI.

If this answer resolves your question, could you mark it as “Accept as Solution”? That helps other users quickly find the correct fix.

Regards,
Ashwin | Delivery Solution Architect @ Databricks
Helping you build and scale the Data Intelligence Platform.
***Opinions are my own***

szymon_dybczak
Esteemed Contributor III

Hi @Debasis_Pal ,

The current Power BI task that is available in databricks workflow will wait for refresh process to return correct status (whether it succeeded or failed).

If you need to replicate the same behaviour as in ADF you can start refresh process by using asynchronous REST API call. The refresh process will start and your cluster can be terminated after that. I'm using this approach in one of the client I worked with.

https://learn.microsoft.com/en-us/power-bi/connect-data/asynchronous-refresh

 

import time
import requests
from azure.identity import ClientSecretCredential


_AUTHORITY = "https://login.microsoftonline.com/"
_PBI_SCOPE = "https://analysis.windows.net/powerbi/api/.default"
_REFRESHES_URL = "https://api.powerbi.com/v1.0/myorg/groups/{workspace_id}/datasets/{dataset_id}/refreshes"

_TERMINAL_STATUSES = {"Completed", "Failed", "Disabled"}
_POLL_INTERVAL_SECONDS = 120


class PowerBIRefreshError(Exception):
    pass


def _get_credential(dbutils, secret_scope, tenant_key, client_id_key, client_secret_key):
    return ClientSecretCredential(
        authority=_AUTHORITY,
        tenant_id=dbutils.secrets.get(scope=secret_scope, key=tenant_key),
        client_id=dbutils.secrets.get(scope=secret_scope, key=client_id_key),
        client_secret=dbutils.secrets.get(scope=secret_scope, key=client_secret_key),
    )


def _auth_headers(credential):
    token = credential.get_token(_PBI_SCOPE).token
    return {"Authorization": f"Bearer {token}"}


def _get_latest_refresh_status(url, credential):
    response = requests.get(url, headers=_auth_headers(credential))
    response.raise_for_status()

    refreshes = response.json().get("value", [])
    return refreshes[0]["status"] if refreshes else None


def _raise_for_pbi_status(response):
    messages = {
        400: "Bad Request: missing or malformed parameters.",
        401: "Unauthorized: authentication failed or insufficient permissions.",
        403: "Forbidden: authenticated user lacks access to the resource.",
        404: "Not Found: the requested resource does not exist.",
        500: "Internal Server Error: an error occurred on the Power BI service.",
    }
    message = messages.get(response.status_code)
    if message:
        raise PowerBIRefreshError(message)
    response.raise_for_status()


def refresh_power_bi_model(
    dbutils,
    secret_scope,
    secret_tenant_key,
    secret_client_id_key,
    secret_client_secret_key,
    workspace_id,
    dataset_id,
    apply_refresh_policy,
    commit_mode,
    poll_interval=_POLL_INTERVAL_SECONDS,
):
    """
    Triggers a Power BI dataset refresh and polls until it reaches a terminal state.

    Returns the terminal status string: "Completed", "Failed", or "Disabled".
    Raises PowerBIRefreshError on API errors or if a refresh is already in progress.
    """
    url = _REFRESHES_URL.format(workspace_id=workspace_id, dataset_id=dataset_id)
    credential = _get_credential(
        dbutils, secret_scope, secret_tenant_key, secret_client_id_key, secret_client_secret_key
    )

    if _get_latest_refresh_status(url, credential) == "InProgress":
        raise PowerBIRefreshError("Dataset is already refreshing. Cannot start a new refresh.")

    payload = {
        "type": "full",
        "commitMode": commit_mode,
        "maxParallelism": 6,
        "retryCount": 0,
        "applyRefreshPolicy": apply_refresh_policy,
    }
    response = requests.post(url, headers=_auth_headers(credential), json=payload)

    if not response.ok:
        _raise_for_pbi_status(response)

    while True:
        status = _get_latest_refresh_status(url, credential)
        if status in _TERMINAL_STATUSES:
            return status
        time.sleep(poll_interval)

 

If my answer was helpful, please consider marking it as accepted solution.