‎05-17-2024 10:58 AM - edited ‎08-12-2024 11:15 AM
This guide offers a detailed, step-by-step approach for building a forecasting model on Databricks. By leveraging the power of Databricks, you will unlock new potentials in your data workflows, streamline model deployment processes, and optimize your forecasting accuracy with state-of-the-art tools and practices.
*It assumes you have a basic familiarity with Databricks notebooks, PySpark, and common machine learning concepts.
This guide is structured to provide a step-by-step approach to advanced forecasting and model deployment, incorporating best practices and leveraging modern tools such as MLflow. By the end of this guide, you will be equipped with the knowledge to:
Data Selection is a crucial initial step for successful time series forecasting. Here's a breakdown of the key aspects to consider
A successful forecasting model relies on a well-structured dataset. For this tutorial, your data should ideally at least have the following columns:
Ensure the data is at the correct frequency (daily, weekly, etc.) and that you have enough data to capture seasonal patterns or trends relevant to your goals.
This section of the guide assumes you have the following:
To get started with Databricks, you need to kickstart your starter warehouse. This compute resource will let you query and explore data on Databricks. You can also use Serverless Warehouse to use this guide.
Note: Only users with workspace administrator permissions are able to start the starter warehouse. If you don’t have admin permissions, please contact your administrator to get access to the starter warehouse or to create a new serverless warehouse. |
To connect your Databricks workspace to your cloud storage, you need to create an external location. An external location is an object that combines a cloud storage path with the credential that authorizes access to the storage path.
Watch and follow along in your workspace as you go!
To verify external locations have functioning connections, do the following:
For help, contact onboarding-help@databricks.com or your Databricks account team directly.
Now that your workspace has a connection to your S3 bucket, you can add your data.
Part of this step is choosing where to put your data. Databricks has a three-level namespace that organizes your data (catalog.schema.table). For this exercise, you import the data into the default catalog named after your workspace.
You can now use Catalog Explorer in your workspace to see your data in Databricks.
Once you have selected your Data, there are 2 primary ways to build a Forecasting Model on Databricks.
Databricks AutoML is a valuable tool for getting started with time series forecasting on Databricks. It streamlines the process, reduces development time, and provides a solid baseline model. Use the following steps to set up an AutoML Model.
You can set up a forecasting problem using the AutoML UI with the following steps:
Monitor Endpoint Status: The Serving endpoints page will display your newly created endpoint. The initial state will likely be "Pending" as deployment progresses. You can refresh the page to monitor the endpoint's status until it becomes "Active."
#Install Dependencies
!pip install prophet
!pip install databricks-sdk==0.12.0
!pip install mlflow
!pip install grpcio
!pip install grpcio-status
!pip install -U "pandas<2.0.0"
dbutils.library.restartPython()
#Select Data
query = f"SELECT <date>, <metric>, <metric_value> FROM <catalog_name>.<schema_name>.<table_name> where is_customer = 'true' and <date> > '2020-01-01' order by date desc" #Update the query to get data from your table
df = spark.sql(query)
# Choose a single metric to make the calculations simpler
df = df.filter(df.<metric> == "xxxxxxxxxxxxxxxxxxxxx") #Replace <metric> with your metric name
df.show(5)
Effectively preparing your data is a foundational step in the forecasting process. Proper preparation ensures the accuracy and reliability of your model's predictions.
from pyspark.sql.functions import col, lit
# Dropping rows with missing values in the 'metric_value' column
cleaned_df = df.na.drop(subset=["<metric_value>"]) #Replace <metric_value> with your metric value column name
cleaned_df.show(5)
# Calculating IQR and defining bounds for outliers
quartiles = cleaned_df.approxQuantile("<metric_value>", [0.25, 0.75], 0.05) #Replace <metric_value> with your metric value column name
IQR = quartiles[1] - quartiles[0]
lower_bound = 0
upper_bound = quartiles[1] + 1.5 * IQR
# Filtering out outliers
no_outliers_df = cleaned_df.filter(
(col("<metric_value>") > lit(lower_bound)) #Replace <metric_value> with your metric value column name
& (col("<metric_value>") <= lit(upper_bound)) #Replace <metric_value> with your metric value column name
)
# Showing the updated DataFrame
no_outliers_df.show(5)
The model you choose will depend on the nature of your data and the specific forecasting problem you’re trying to solve. Take into consideration different data characteristics such as Frequency (daily vs weekly), granularity (hourly vs daily sales), seasonality, and any other external factors such as holidays, promotions etc. Machine learning methods:
For the scope of this tutorial, Prophet will serve as our primary model. Prophet stands out for its user-friendly nature and robust handling of various time series forecasting challenges, making it a versatile option for a wide range of applications.
When deciding on which model to use, evaluate your dataset's characteristics carefully:
Each model has its strengths, and sometimes a combination or ensemble approach might yield the best results. Trial and experimentation with each model on your specific dataset are essential steps to identify the most suitable model for your forecasting goals.
from prophet import Prophet
from pyspark.sql.functions import col, to_date
# Prophet requires at the minimum 2 columns - ds & y
train_df = no_outliers_df.select(to_date(col("<date>")).alias("ds"), col("<metric>"), col("<metric_value>").alias("y").cast("double")).orderBy(col("ds").desc())
# set model parameters
prophet_model = Prophet(
interval_width=0.95,
growth='linear',
daily_seasonality=True,
weekly_seasonality=True,
yearly_seasonality=True,
seasonality_mode='additive'
)
# fit the model to historical data
history_pd = train_df.toPandas()
prophet_model.fit(history_pd)
To effectively utilize Prophet for forecasting in Databricks, follow these concise steps:
#Fit the model to historical data
history_pd = train_df.toPandas()
prophet_model.fit(history_pd)
#Define Dataset with historical dates & 10-days beyond the last available date
future_pd = prophet_model.make_future_dataframe(
periods=10,
freq='d',
include_history=True
)
#Forecast
forecast_pd = prophet_model.predict(future_pd)
display(forecast_pd)
This can be done by comparing the model’s forecasts to actual data and calculating performance metrics like Mean Squared Error (MSE). Performance Metrics:
The choice between these metrics should be informed by your specific forecasting objectives and the nature of the data. In practice, assessing model performance might involve looking at multiple metrics to get a comprehensive view of the model's accuracy.
import pandas as pd
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
from math import sqrt
from datetime import date
# get historical actuals & predictions for comparison
actuals_pd = history_pd[history_pd['ds'] < date(2024, 4, 29)]['y'] #Update it to max date on your dataset
predicted_pd = forecast_pd[forecast_pd['ds'] < pd.to_datetime('2024-04-29')]['yhat'] #Update it to max date on your dataset
# calculate evaluation metrics
mae = mean_absolute_error(actuals_pd, predicted_pd)
mse = mean_squared_error(actuals_pd, predicted_pd)
rmse = sqrt(mse)
# Print other metrics
print(f"MAE: {mae}")
print(f"MSE: {mse}")
print(f"RMSE: {rmse}")
Model Logging is critical for tracking the performance and changes to models over time, ensuring reproducibility and accountability. Techniques include:
Logging models with MLflow offers several advantages:
For the purpose of this tutorial, we demonstrate how to efficiently log a Prophet model using MLflow, capturing essential information that supports further analysis and model deployment.
import mlflow
from mlflow.tracking import MlflowClient
from mlflow.models.signature import infer_signature
from mlflow.pyfunc import PythonModel, log_model
# Define the catalog, schema, and model name for organizing the model within the MLflow model registry
catalog = "catalog_name" #Update it to your catalog name
schema = "schema_name" #Update it to your schema name
model_name = "forecastingmodel" #Update it to your model name
class MyPythonModel(mlflow.pyfunc.PythonModel):
def __init__(self, model):
self.model = model
def predict(self, context, model_input):
future_pd = self.model.make_future_dataframe(periods=10, freq="d", include_history=True)
forecast_pd = self.model.predict(future_pd)
return forecast_pd[["ds", "yhat", "yhat_upper", "yhat_lower"]]
wrapped_model = MyPythonModel(prophet_model)
# Enable MLflow auto logging for tracking machine learning metrics and artifacts
with mlflow.start_run(run_name="Prophet Model Run") as run:
input_example = history_pd.head()[["ds", "y"]]
output_example = prophet_model.predict(input_example).iloc[:10]
# Log calculated metrics
mlflow.log_metric("mae", mae)
mlflow.log_metric("rmse", rmse)
print(output_example)
# Infer the signature of the machine learning model
signature = infer_signature(input_example, output_example)
# Update dependencies in the default conda environment
env = mlflow.pyfunc.get_default_conda_env()
env['dependencies'][-1]['pip'] += ["prophet==1.1.5"]
env['dependencies'][-1]['pip'] += ["pandas==1.5.3"]
env['dependencies'][-1]['pip'] += ["pyspark==3.5.1"]
env['dependencies'][-1]['pip'] += ["grpcio==1.62.0"]
env['dependencies'][-1]['pip'] += ["grpcio_status==1.62.0"]
# Log the trained model to MLflow with the inferred signature
model_log = log_model(
artifact_path="forecasting_model",
python_model=wrapped_model,
signature=signature,
input_example=input_example,
registered_model_name=f"{catalog}.{schema}.{model_name}",
conda_env=env
)
# Retain the "run_id" for use with other MLflow functionalities like registering the model
run_id = run.info.run_uuid
#Get the latest Model Version
def get_latest_model_version(model_name:str = None):
latest_version = 1
mlflow_client = MlflowClient()
for mv in mlflow_client.search_model_versions(f"name='{model_name}'"):
version_int = int(mv.version)
if version_int > latest_version:
latest_version = version_int
return latest_version
model_version = get_latest_model_version(f"{catalog}.{schema}.{model_name}")
Deploying machine learning models into production on Databricks can be achieved through two primary methods: MLflow for batch inference and prediction, and Databricks Model Serving for real-time inference. Each serves different use cases based on the requirement for real-time responses and the scale of data processing.
import mlflow, os
import requests, json
from databricks.sdk import WorkspaceClient
from databricks.sdk.service.serving import EndpointCoreConfigInput, ServedModelInput
from mlflow.deployments import get_deploy_client
serving_endpoint_name = "forecasting_model_serving"
# Get the API endpoint and token for the current notebook context
API_ROOT = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get()
API_TOKEN = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().getOrElse(None)
client = get_deploy_client("databricks")
# Check if the endpoint already exists
existing_endpoint = next(
(e for e in client.list_endpoints() if e['name'] == serving_endpoint_name), None
)
# Update the endpoint configuration
endpoint_config = {
"served_entities": [
{
"entity_name": f"{catalog}.{schema}.{model_name}",
"entity_version": model_version,
"workload_size": "Small",
"workload_type": "CPU",
"scale_to_zero_enabled": True
}
]
}
if existing_endpoint is not None:
# Update the existing endpoint
endpoint = client.update_endpoint(
endpoint=serving_endpoint_name,
config=endpoint_config
)
else:
# Create a new endpoint if it does not exist
endpoint = client.create_endpoint(
name=serving_endpoint_name,
config=endpoint_config
)
# Wait for Endpoint to be ready
import time
from datetime import datetime, timedelta
# Define the maximum wait time (20 minutes)
max_wait_time = timedelta(minutes=20)
deadline = datetime.now() + max_wait_time
# Function to check the status of the endpoint
def check_endpoint_status(client, endpoint_name):
endpoints = client.list_endpoints()
for endpoint in endpoints:
if endpoint['name'] == endpoint_name:
return endpoint
return None
# Wait for the endpoint to be ready or until the deadline is reached
while datetime.now() < deadline:
endpoint_info = check_endpoint_status(client, serving_endpoint_name)
if endpoint_info is not None and str(endpoint_info['state']['ready']).lower() == 'ready' and str(endpoint_info['state']['config_update']).lower() != 'in_progress':
print(f"Endpoint {serving_endpoint_name} is ready.")
break
else:
print(f"Waiting for endpoint {serving_endpoint_name} to be ready. Current status: {endpoint_info['state'] if endpoint_info else 'Not Found'}")
time.sleep(60) # Wait for 60 seconds before checking again
else:
print(f"Timeout reached. Endpoint {serving_endpoint_name} may not be ready.")
displayHTML(
f'Your Model Endpoint Serving is now available. Open the <a href="/ml/endpoints/{serving_endpoint_name}">Model Serving Endpoint page</a> for more details.'
)
After developing and deploying your machine learning model, the final step is to utilize the model to make predictions. This process involves sending new data to the model endpoint and interpreting the predictions returned by the model.
#Predict using Served Model
import requests
import json
from datetime import date, datetime
# Custom encoder for handling date and datetime objects in JSON serialization
class CustomJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, (datetime, date)):
return obj.isoformat()
return json.JSONEncoder.default(self, obj)
# Prepare data payload from DataFrame for model invocation
data_payload = {"dataframe_records": history_pd.to_dict(orient='records')}
data_json = json.dumps(data_payload, cls=CustomJSONEncoder)
# Setup headers for the POST request
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {API_TOKEN}",
}
# Endpoint URL for model invocation
serving_endpoint_url = f"{API_ROOT}/serving-endpoints/{serving_endpoint_name}/invocations"
# API call to deploy model and obtain predictions
response = requests.post(serving_endpoint_url, headers=headers, data=data_json)
# Check and display the response
if response.status_code == 200:
predictions = response.json()
print("Predictions:", predictions)
else:
print("Failed to make predictions")
#Visualize the Predictions
import pandas as pd
import matplotlib.pyplot as plt
from datetime import datetime, timedelta
# Convert predictions JSON to DataFrame
pred_df = pd.json_normalize(predictions['predictions'])
# Ensure 'ds' columns are datetime objects for merging and filtering
history_pd['ds'] = pd.to_datetime(history_pd['ds'])
pred_df['ds'] = pd.to_datetime(pred_df['ds'])
# Merge historical and prediction data on 'ds'
combined_df = pd.merge(left=pred_df, right=history_pd, on='ds', how='left')
# Filter for data from the last 60 days
combined_df = combined_df[combined_df['ds'] >= history_pd['ds'].max() - timedelta(days=60)]
# Plotting setup
plt.figure(figsize=(12, 6))
# Plot actual values and predictions
plt.plot(combined_df['ds'], combined_df['y'], label='Actual', color='black')
plt.plot(combined_df['ds'], combined_df['yhat'], label='Predicted', color='blue')
# Indicate prediction uncertainty
plt.fill_between(combined_df['ds'], combined_df['yhat_lower'], combined_df['yhat_upper'], color='gray', alpha=0.2)
# Finalize plot
plt.title('Model Predictions vs Actual Values')
plt.xlabel('Date')
plt.ylabel('Value')
plt.legend()
plt.grid(True)
# Display the plot
plt.show()
In conclusion, successfully forecasting on Databricks hinges on a thorough understanding of your data, meticulous preparation, strategic model selection, and continuous improvement through iteration, logging, and serving. By integrating these practices into your workflow, you can develop powerful forecasting models that drive informed decisions for your organization.
Truly well articulated.
@DatabricksGuide , Really helpful and informative