cancel
Showing results for 
Search instead for 
Did you mean: 
Get Started Guides
Access comprehensive guides and tutorials to help you get started with Databricks. Learn how to set up your environment, build your first projects, and harness the full power of the platform.
cancel
Showing results for 
Search instead for 
Did you mean: 
DatabricksGuide
Community Manager
Community Manager

Building a Forecasting Model on Databricks: A Step-by-Step Guide

This guide offers a detailed, step-by-step approach for building a forecasting model on Databricks. By leveraging the power of Databricks, you will unlock new potentials in your data workflows, streamline model deployment processes, and optimize your forecasting accuracy with state-of-the-art tools and practices. 

*It assumes you have a basic familiarity with Databricks notebooks, PySpark, and common machine learning concepts.

What Will You Learn?

This guide is structured to provide a step-by-step approach to advanced forecasting and model deployment, incorporating best practices and leveraging modern tools such as MLflow. By the end of this guide, you will be equipped with the knowledge to:

  • Effectively Prepare and Process Data: Understand how to select, clean, and prepare your data for forecasting, emphasizing the importance of feature engineering and data quality.
  • Build Robust Forecasting Models: Dive into the selection of sophisticated forecasting models that cater to various data characteristics. 
  • Master Model Logging with MLflow: Learn the intricacies of model logging, enabling efficient tracking of model versions, parameters, and performance metrics. 
  • Deploy Models with Confidence: Navigate through the model serving landscape, understanding different deployment strategies for making your models accessible for predictions.
  • Visualize and Interpret Model Predictions: Discover techniques for visualizing forecasting results, allowing you to compare predicted values against actual historical data and assess model performance visually.
  • Best Practices for Forecasting and Model Deployment: Throughout the guide, insights into best practices will prepare you to tackle common challenges and make informed decisions.

Data Selection

Data Selection is a crucial initial step for successful time series forecasting. Here's a breakdown of the key aspects to consider

  • Identify the Forecasting Goal: What are you trying to predict? Sales, website traffic, resource utilization? Clearly define the target variable you want to forecast.
  • Data Frequency: Choose a data frequency that aligns with your forecasting needs. Daily, weekly, monthly, or even hourly data might be suitable depending on the problem.
  • Data Sufficiency: Make sure you have enough data to train and validate your forecasting model. The amount of data required depends on the model complexity (e.g., LSTMs typically require more data than ARIMA).

A successful forecasting model relies on a well-structured dataset. For this tutorial, your data should ideally at least have the following columns:

  • Date: Timestamp for each observation.
  • Metric: The specific aspect you’re forecasting (e.g., sales, website traffic).
  • Metric Value: The numerical data point you aim to predict (e.g., number of users, sales amount).

Ensure the data is at the correct frequency (daily, weekly, etc.) and that you have enough data to capture seasonal patterns or trends relevant to your goals.

Ingest Setup Steps [20 minutes]

Step 0: Check your required prerequisites

This section of the guide assumes you have the following:

  1. You have a Databricks workspace up and running
  2. Your account has Unity Catalog enabled. UC is enabled by default
  3. You have admin permissions for your Databricks account
  4. You have access to your AWS account and data residing in a S3 storage bucket

Step 1: Access and start your warehouse

To get started with Databricks, you need to kickstart your starter warehouse. This compute resource will let you query and explore data on Databricks, specifically using SQL commands. There are two options:

  1. In your Databricks workspace, click Compute on the sidebar.
  2. At the top of the page, click on the subtab “SQL Warehouses”
  3. Locate the Starter Warehouse in your SQL warehouse list view
  4. Click the start icon next to the warehouse to kickstart
Note: only users with workspace administrator permissions are able to start the starter warehouse. If you don’t have admin permissions, please contact your administrator to get access to the starter warehouse or to create a new serverless warehouse.

 

Step 2: Connect your workspace to data sources

To connect your Databricks workspace to your cloud storage, you need to create an external location. An external location is an object that combines a cloud storage path with the credential that authorizes access to the storage path.

DatabricksGuide_0-1715968895219.gif

Watch and follow along in your workspace as you go!

  1. In your Databricks workspace, click Catalog on the sidebar.
  2. At the top of the page, click + Add.
  3. Click Add an external location.
  4. Databricks recommends using the AWS Quickstart, which ensures that your workspace is given the correct permissions on the bucket.
  5. In Bucket Name, enter the name of the bucket you want to import data from.
  6. Click Generate New Token and copy the token.
  7. Click Launch in Quickstart.
  8. In your AWS console, enter the copied token in the Databricks Personal Access Token field.
  9. Select the I acknowledge that AWS CloudFormation might create IAM resources with custom names checkbox.
  10. Click Create stack. To see the external locations in your workspace, click Catalog in the sidebar, at the bottom of the left navigation pane click External Data, and then click External Locations. Your new external location will have a name using the following syntax: db_s3_external_databricks-S3-ingest-<id>.
  11. Test your connection

To verify external locations have functioning connections, do the following:

  1. Click the external location you want to test.
  2. Click Test connection.

For help, contact onboarding-help@databricks.com or your Databricks account team directly.

Step 3: Add your data to Databricks

Now that your workspace has a connection to your S3 bucket, you can add your data.

Part of this step is choosing where to put your data. Databricks has a three-level namespace that organizes your data (catalog.schema.table). For this exercise, you import the data into the default catalog named after your workspace.

  1. In the sidebar of your Databricks workspace, click New > Add data.
  2. Click Amazon S3.
  3. Select your external location from the drop-down menu.
  4. Select all the files you want to add to your Databricks catalog.
  5. Click Preview table.
  6. Select the default catalog (named after your workspace), the default schema, and then enter a name for your table.
  7. Click Create Table.

You can now use Catalog Explorer in your workspace to see your data in Databricks.

Forecasting on Databricks

Once you have selected your Data, there are 2 primary ways to build a Forecasting Model on Databricks.

  1. Databricks AutoML: This is a user-friendly option, especially for those new to forecasting or machine learning. Databricks AutoML automates the process of selecting and training the most suitable forecasting model for your data.
  2. Databricks Notebook: This is designed for users with experience in machine learning and time series forecasting. It offers more control and flexibility for building customized models.

A) Forecasting Model Using AutoML

Databricks AutoML is a valuable tool for getting started with time series forecasting on Databricks. It streamlines the process, reduces development time, and provides a solid baseline model. Use the following steps to set up an AutoML Model.

Set up the Forecasting Problem

You can set up a forecasting problem using the AutoML UI with the following steps:

 

DatabricksGuide_0-1715967500817.gif
  1. Click on New -> AutoML Experiment
  2. In the Compute field, select a cluster running Databricks Runtime 10.0 ML or above.
  3. From the ML problem type drop-down menu, select Forecasting.
  4. Under Dataset, click Browse. Navigate to the table you want to use and click Select. The table schema appears.
  5. Click in the Prediction target field. A dropdown menu appears listing the columns shown in the schema. Select the column you want the model to predict.
  6. Click in the Time column field. A drop-down appears showing the dataset columns that are of type timestamp or date. Select the column containing the time periods for the time series.
  7. For multi-series forecasting, select the column(s) that identify the individual time series from the Time series identifiers drop-down. AutoML groups the data by these columns as different time series and trains a model for each series independently. 
  8. In the Forecast horizon and frequency fields, specify the number of time periods into the future for which AutoML should calculate forecasted values.
  9. In Databricks Runtime 10.5 ML and above, you can save prediction results. To do so, specify a database in the Output Database field. Click Browse and select a database from the dialog. AutoML writes the prediction results to a table in this database.
  10. The Experiment name field shows the default name. To change it, type the new name in the field.

Register the Best Model

  1. Once your AutoML experiment finishes, identify the best performing model run based on your chosen metric.
  2. In the Experiments UI, navigate to the run details of the best model.
  3. Look for a "Register Model" button or option. 
  4. Select to register via ’Unity Catalog’.
  5. Run the provided code in a notebook to register the model.

Create a Serving Endpoint

DatabricksGuide_1-1715967500776.gif
  1. Click on New -> Serving Endpoint
  2. Initiate the creation of the endpoint. In the Name field, provide a descriptive name for your endpoint.
  3. Under the Served entities section, click on the Entity field.
  4. Choose Unity Catalog as the source and select your model by choosing the appropriate catalog, schema and model name.
  5. Select the specific model and its version you want to deploy.
  6. Select the appropriate Compute Type (CPU for this model) based on your model's requirements.
  7. Select ‘Small’ as the Compute Scale-out.
  8. Once you've configured the settings, click the Create button.

Monitor Endpoint Status: The Serving endpoints page will display your newly created endpoint. The initial state will likely be "Pending" as deployment progresses. You can refresh the page to monitor the endpoint's status until it becomes "Active."

B) Forecasting Model Using Notebook

Select Data

#Install Dependencies
!pip install prophet
!pip install databricks-sdk==0.12.0
!pip install -U "pandas<2.0.0"
dbutils.library.restartPython()

#Select Data
query = f"SELECT <date>, <metric>, <metric_value> FROM <catalog_name>.<schema_name>.<table_name> where is_customer = 'true' and <date> > '2020-01-01' order by date desc" #Update the query to get data from your table

df = spark.sql(query)

# Choose a single metric to make the calculations simpler
df = df.filter(df.<metric> == "xxxxxxxxxxxxxxxxxxxxx") #Replace <metric> with your metric name
df.show(5)

Data Preparation

Effectively preparing your data is a foundational step in the forecasting process. Proper preparation ensures the accuracy and reliability of your model's predictions.

  • Missing Values: Identify and address missing values in your data. Common strategies include deletion (if minimal), imputation (filling in missing values with statistical methods or previous observations), or interpolation (estimating missing values based on surrounding data points).
  • Outliers: Identify and handle outliers, which are extreme data points that can significantly distort your forecasts. You can choose to remove outliers if they are truly erroneous or winsorize them (capping their values to a certain threshold).
  • Time Consistency: Ensure your data has consistent timestamps and that the time steps are evenly spaced (e.g., daily data points should be recorded at the same time each day).
  • Feature Engineering: Create new features from existing ones if it can improve the forecasting model's performance. This might involve calculating rolling averages, seasonality indicators, or lag features (past values of the target variable).
from pyspark.sql.functions import col, lit

# Dropping rows with missing values in the 'metric_value' column
cleaned_df = df.na.drop(subset=["<metric_value>"]) #Replace <metric_value> with your metric value column name
cleaned_df.show(5)

# Calculating IQR and defining bounds for outliers
quartiles = cleaned_df.approxQuantile("<metric_value>", [0.25, 0.75], 0.05) #Replace <metric_value> with your metric value column name
IQR = quartiles[1] - quartiles[0]
lower_bound = 0
upper_bound = quartiles[1] + 1.5 * IQR

# Filtering out outliers
no_outliers_df = cleaned_df.filter(
    (col("<metric_value>") > lit(lower_bound)) #Replace <metric_value> with your metric value column name
    & (col("<metric_value>") <= lit(upper_bound)) #Replace <metric_value> with your metric value column name
)

# Showing the updated DataFrame
no_outliers_df.show(5)

Model Selection

The model you choose will depend on the nature of your data and the specific forecasting problem you’re trying to solve. Take into consideration different data characteristics such as Frequency (daily vs weekly), granularity (hourly vs daily sales), seasonality, and any other external factors such as holidays, promotions etc. Machine learning methods:

  • Prophet: User-friendly and specifically designed for time series forecasting, offering built-in seasonality and holiday handling.
  • ARIMA: A classical statistical method for time series forecasting, capturing short-term patterns and trends in stationary data through autocorrelation, differencing, and moving average components.
  • LSTMs (Long Short-Term Memory): Powerful for capturing complex relationships and long-term dependencies in time series data.

For the scope of this tutorial, Prophet will serve as our primary model. Prophet stands out for its user-friendly nature and robust handling of various time series forecasting challenges, making it a versatile option for a wide range of applications.

Model Choice Justification

When deciding on which model to use, evaluate your dataset's characteristics carefully:

  • Prophet is particularly beneficial when your data includes strong seasonal effects and you have some domain knowledge to incorporate holidays and other special events.
  • ARIMA works well if your time series is relatively stable and exhibits trends and autocorrelation that ARIMA's structure can model.
  • LSTMs offer the greatest flexibility and learning capability for complex and long-term dependencies, but at the cost of needing more data and computational resources.

Each model has its strengths, and sometimes a combination or ensemble approach might yield the best results. Trial and experimentation with each model on your specific dataset are essential steps to identify the most suitable model for your forecasting goals.

from prophet import Prophet
from pyspark.sql.functions import col, to_date

# Prophet requires at the minimum 2 columns - ds & y
train_df = no_outliers_df.select(to_date(col("<date>")).alias("ds"), col("<metric>"), col("<metric_value>").alias("y").cast("double")).orderBy(col("ds").desc())

# set model parameters
prophet_model = Prophet(
  interval_width=0.95,
  growth='linear',
  daily_seasonality=True,
  weekly_seasonality=True,
  yearly_seasonality=True,
  seasonality_mode='additive'
  )
 
# fit the model to historical data
history_pd = train_df.toPandas()
prophet_model.fit(history_pd)

Fit Data & Build Forecast

To effectively utilize Prophet for forecasting in Databricks, follow these concise steps:

  • Data Preparation: Your dataset must have two columns: one for the datetime (ds) in YYYY-MM-DD format, and another for the metric you're forecasting (y). This format is crucial for the model to correctly learn from your data.
  • Model Initialization and Fitting: Once your data is properly formatted, you will initiate a Prophet model instance and fit it with your historical data. During this step, Prophet analyzes your data's patterns to prepare for forecasting.
  • Forecasting Future Values: After fitting the model, you will create a DataFrame that outlines the future dates you wish to predict. Using this DataFrame, Prophet will generate forecasts for the specified future dates. 
#Fit the model to historical data
history_pd = train_df.toPandas()
prophet_model.fit(history_pd)

#Define Dataset with historical dates & 10-days beyond the last available date
future_pd = prophet_model.make_future_dataframe(
  periods=10, 
  freq='d', 
  include_history=True
  )
 
#Forecast
forecast_pd = prophet_model.predict(future_pd)
display(forecast_pd)

Evaluation

This can be done by comparing the model’s forecasts to actual data and calculating performance metrics like Mean Squared Error (MSE). Performance Metrics:

  • Mean Squared Error (MSE): Measures the average squared difference between the estimated values and the actual value, offering a view of the overall variance in the forecasting errors. Lower MSE values denote a model with fewer errors.
  • Root Mean Squared Error (RMSE): Represents the square root of MSE, thus re-scaling errors to the original units of the target variable, which improves interpretability.
  • Mean Absolute Error (MAE): Averages the absolute differences between predicted and actual values. Unlike MSE, MAE is more robust to outliers, as it does not square the errors.

Interpreting the Metrics

  • MSE and RMSE are more sensitive to outliers due to squaring the errors, often used when large errors are particularly undesirable.
  • MAE is straightforward and easy to interpret, as it directly represents the average error.

The choice between these metrics should be informed by your specific forecasting objectives and the nature of the data. In practice, assessing model performance might involve looking at multiple metrics to get a comprehensive view of the model's accuracy.

import pandas as pd

from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from math import sqrt
from datetime import date

# get historical actuals & predictions for comparison
actuals_pd = history_pd[history_pd['ds'] < date(2024, 4, 29)]['y'] #Update it to max date on your dataset
predicted_pd = forecast_pd[forecast_pd['ds'] < pd.to_datetime('2024-04-29')]['yhat'] #Update it to max date on your dataset

# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)

# Print other metrics
print(f"MAE: {mae}")
print(f"MSE: {mse}")
print(f"RMSE: {rmse}")

Log the model with MLflow

Model Logging is critical for tracking the performance and changes to models over time, ensuring reproducibility and accountability. Techniques include:

  • MLflow Logging: Utilize MLflow's robust platform for logging models, parameters, and artifacts. It supports structured experiment tracking, perfect for recording and comparing different versions of your models.
  • Custom Logging: Implement tailored logging approaches to capture unique model insights or additional metadata not standardly logged by existing tools.

Benefits of Logging with MLflow

Logging models with MLflow offers several advantages:

  • Reproducibility: By capturing all necessary details of the experimentation phase, MLflow makes it easier to replicate results and understand decision-making processes.
  • Model Registry: MLflow allows for versioning of models, making it simple to manage and deploy specific model versions based on performance metrics.
  • Collaboration and Sharing: Teams can leverage MLflow’s centralized model storage to share models and results, enhancing collaboration.

For the purpose of this tutorial, we demonstrate how to efficiently log a Prophet model using MLflow, capturing essential information that supports further analysis and model deployment.

import mlflow
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature
from mlflow.pyfunc import PythonModel, log_model

# Define the catalog, schema, and model name for organizing the model within the MLflow model registry
catalog = "catalog_name" #Update it to your catalog name
schema = "schema_name" #Update it to your schema name
model_name = "forecastingmodel" #Update it to your model name

class MyPythonModel(mlflow.pyfunc.PythonModel):
    def __init__(self, model):
        self.model = model

    def predict(self, context, model_input):
        future_pd = self.model.make_future_dataframe(periods=10, freq="d", include_history=True)
        forecast_pd = self.model.predict(future_pd)
        return forecast_pd[["ds", "yhat", "yhat_upper", "yhat_lower"]]
    
wrapped_model = MyPythonModel(prophet_model)

# Enable MLflow auto logging for tracking machine learning metrics and artifacts
with mlflow.start_run(run_name="Prophet Model Run") as run:

    input_example = history_pd.head()[["ds", "y"]]
    output_example = prophet_model.predict(input_example).iloc[:10]

    # Log calculated metrics
    mlflow.log_metric("mae", mae)
    mlflow.log_metric("rmse", rmse)

    print(output_example)
    # Infer the signature of the machine learning model
    signature = infer_signature(input_example, output_example)

    # Update dependencies in the default conda environment
    env = mlflow.pyfunc.get_default_conda_env()
    env['dependencies'][-1]['pip'] += ["prophet==1.1.5"]
    env['dependencies'][-1]['pip'] += ["pandas==1.5.3"]

    # Log the trained model to MLflow with the inferred signature
    model_log = log_model(
        artifact_path="forecasting_model",
        python_model=wrapped_model,
        signature=signature,
        input_example=input_example,
        registered_model_name=f"{catalog}.{schema}.{model_name}",
        conda_env=env
    )

    # Retain the "run_id" for use with other MLflow functionalities like registering the model
    run_id = run.info.run_uuid
#Get the latest Model Version
def get_latest_model_version(model_name:str = None):
    latest_version = 1
    mlflow_client = MlflowClient()
    for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
      version_int = int(mv.version)
      if version_int > latest_version:
        latest_version = version_int
    return latest_version
  
model_version = get_latest_model_version(f"{catalog}.{schema}.{model_name}")

Deploy Models on Databricks

Deploying machine learning models into production on Databricks can be achieved through two primary methods: MLflow for batch inference and prediction, and Databricks Model Serving for real-time inference. Each serves different use cases based on the requirement for real-time responses and the scale of data processing.

  • MLflow for Batch Inference and Prediction: Batch processing is ideal for scenarios where predictions can be made on large datasets at once without the need for immediate responses. This method fits well with scheduled analytics and reporting.
  • Databricks Model Serving for Real-Time Inference: This method is better suited for scenarios where low latency and real-time responses are important.
import mlflow, os
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput

serving_endpoint_name = "forecasting_model_serving"

# Get the API endpoint and token for the current notebook context
API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)

w = WorkspaceClient(host=API_ROOT, token=API_TOKEN)
endpoint_config = EndpointCoreConfigInput(
    name=serving_endpoint_name,
    served_models=[
        ServedModelInput(
            model_name=f"{catalog}.{schema}.{model_name}",
            model_version=model_version,
            workload_type="CPU",
            workload_size="Small",
            scale_to_zero_enabled=True
        )
    ]
)

existing_endpoint = next(
    (e for e in w.serving_endpoints.list() if e.name == serving_endpoint_name), None
)
serving_endpoint_url = f"{API_ROOT}/ml/endpoints/{serving_endpoint_name}"
if existing_endpoint == None:
    print(f"Creating the endpoint {serving_endpoint_url}, this will take a few minutes to package and deploy the endpoint...")
    w.serving_endpoints.create_and_wait(name=serving_endpoint_name, config=endpoint_config)
else:
    print(f"Updating the endpoint {serving_endpoint_url} to version {model_version}, this will take a few minutes to package and deploy the endpoint...")
    w.serving_endpoints.update_config_and_wait(served_models=endpoint_config.served_models, name=serving_endpoint_name)
    
displayHTML(f'Your Model Endpoint Serving is now available. Open the <a href="/ml/endpoints/{serving_endpoint_name}">Model Serving Endpoint page</a> for more details.')

Build Forecast and Continuous Improvement

After developing and deploying your machine learning model, the final step is to utilize the model to make predictions. This process involves sending new data to the model endpoint and interpreting the predictions returned by the model.

  • Generate Forecasts: Use your model to predict future values based on historical data.
  • Validate and Iterate: Continuously validate your model against new data and iterate to improve accuracy and reliability.
#Predict using Served Model
import requests
import json
from datetime import date, datetime

# Custom encoder for handling date and datetime objects in JSON serialization
class CustomJSONEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, (datetime, date)):
            return obj.isoformat()
        return json.JSONEncoder.default(self, obj)

# Prepare data payload from DataFrame for model invocation
data_payload = {"dataframe_records": history_pd.to_dict(orient='records')}
data_json = json.dumps(data_payload, cls=CustomJSONEncoder)

# Setup headers for the POST request
headers = {
    "Content-Type": "application/json",
    "Authorization": f"Bearer {API_TOKEN}",
}

# Endpoint URL for model invocation
serving_endpoint_url = f"{API_ROOT}/serving-endpoints/{serving_endpoint_name}/invocations"

# API call to deploy model and obtain predictions
response = requests.post(serving_endpoint_url, headers=headers, data=data_json)

# Check and display the response
if response.status_code == 200:
    predictions = response.json()
    print("Predictions:", predictions)
else:
    print("Failed to make predictions")
#Visualize the Predictions
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta

# Convert predictions JSON to DataFrame
pred_df = pd.json_normalize(predictions['predictions'])

# Ensure 'ds' columns are datetime objects for merging and filtering
history_pd['ds'] = pd.to_datetime(history_pd['ds'])
pred_df['ds'] = pd.to_datetime(pred_df['ds'])

# Merge historical and prediction data on 'ds'
combined_df = pd.merge(left=pred_df, right=history_pd, on='ds', how='left')

# Filter for data from the last 60 days
combined_df = combined_df[combined_df['ds'] >= history_pd['ds'].max() - timedelta(days=60)]

# Plotting setup
plt.figure(figsize=(12, 6))

# Plot actual values and predictions
plt.plot(combined_df['ds'], combined_df['y'], label='Actual', color='black')
plt.plot(combined_df['ds'], combined_df['yhat'], label='Predicted', color='blue')

# Indicate prediction uncertainty
plt.fill_between(combined_df['ds'], combined_df['yhat_lower'], combined_df['yhat_upper'], color='gray', alpha=0.2)

# Finalize plot
plt.title('Model Predictions vs Actual Values')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)

# Display the plot
plt.show()
DatabricksGuide_2-1715968440657.png

In conclusion, successfully forecasting on Databricks hinges on a thorough understanding of your data, meticulous preparation, strategic model selection, and continuous improvement through iteration, logging, and serving. By integrating these practices into your workflow, you can develop powerful forecasting models that drive informed decisions for your organization.

Version history
Last update:
a month ago
Updated by:
Contributors