05-29-2025 07:54 AM
ISSUE -- Not able to run PipelineModel load functions unity catalog cluster
ERROR --[JVM_ATTRIBUTE_NOT_SUPPORTED] Attribute `sparkContext` is not supported in Spark Connect as it depends on the JVM. If you need to use this attribute, do not use Spark Connect when creating your session. Visit https://spark.apache.org/docs/latest/sql-getting-started.html#starting-point-sparksession for creating regular Spark Session in detail.
ANALYSIS --
In Databricks, the difference between spark session type:
<class 'pyspark.sql.connect.session.SparkSession'> (used in Unity Catalog-enabled clusters with Spark Connect)
<class 'pyspark.sql.session.SparkSession'> (used in standard clusters)
Why This Happens
Unity Catalog clusters often use Spark Connect, which is a client-server architecture where the client uses pyspark.sql.connect.SparkSession.
Non-Unity Catalog clusters use the traditional monolithic SparkSession (pyspark.sql.SparkSession).
When we are running code in standard clusters and taking model file from mounts than we are able to run code
but in case of unity catalog cluster, spark session is created using spark connect in which below code is not working
from pyspark.sql import SparkSession
#from pyspark.ml.pipeline import PipelineModel
from pyspark.ml.classification import RandomForestClassificationModel
from datetime import datetime
from pyspark.ml import PipelineModel
# Load the model from Unity Catalog volume
model_path = "<volumnePath>/sparkML_pipeline2022_2_0.model"
pipeline_model = PipelineModel.load(model_path)
Able to run
-- on single user cluster. This is not recommented as multiple user will be using same cluster
Please let me know if any one of you can help in fixing this issue
06-03-2025 06:27 PM
This is a well-known limitation when working with Unity Catalog clusters and Spark ML models. The issue occurs because Spark Connect (used in Unity Catalog clusters) doesn't support direct JVM access, which PipelineModel.load() requires.
Here is solution to resolve this:
Solution 1: Use MLflow for Model Management:
import mlflow
import mlflow.spark
from mlflow.tracking import MlflowClient
# If your model isn't already in MLflow, register it first:
# (Run this once on a standard cluster)
"""
with mlflow.start_run():
mlflow.spark.log_model(pipeline_model, "spark_pipeline_model")
# Register the model
client = MlflowClient()
model_version = mlflow.register_model(
f"runs:/{mlflow.active_run().info.run_id}/spark_pipeline_model",
"spark_pipeline_classifier"
)
"""
# Load the model in Unity Catalog cluster:
model_uri = "models:/spark_pipeline_classifier/latest" # or specific version
loaded_model = mlflow.spark.load_model(model_uri)
# Use the model for predictions
predictions = loaded_model.transform(test_df)
06-04-2025 03:21 AM
Thanks for your reply LRALVA, When i tried to run mlflow.spark.log_model(pipeline_model, "spark_pipeline_model") on my already saved model which was saved using random forest a long back. log_model gives me error that model is not a spark flavor. So i tried with mlflow.sklearn.log_model(pipeline_model, "spark_pipeline_model") which worked and I am able to register model under models but when I load it back and run transform function on it it is giving me error
AttributeError: 'str' object has no attribute 'transform'
Code I am running to load -
import mlflow
import mlflow.spark
from mlflow.tracking import MlflowClient
# Load the model in Unity Catalog cluster:
model_uri = "models:/sparkML_rf2022_2_0/latest" # or specific version
loaded_model = mlflow.sklearn.load_model(model_uri)
df = spark.read.parquet('<data/path>')
# Use the model for predictions
predictions = loaded_model.transform(df)
06-04-2025 09:21 AM
The issue you're encountering is due to a mismatch between model flavors and loading methods.
When you used mlflow.sklearn.log_model() to log a Spark ML PipelineModel, you incorrectly logged it as a scikit-learn model, but it's actually a Spark ML model. This causes type confusion when loading.
Solution: Re-log the Model with Correct Flavor
First, determine what type of model you actually have:
from pyspark.ml import PipelineModel
import mlflow
import mlflow.spark
# Load your original model
model_path = "<volumePath>/sparkML_pipeline2022_2_0.model"
pipeline_model = PipelineModel.load(model_path)
# Check the model type
print(f"Model type: {type(pipeline_model)}")
print(f"Model stages: {[type(stage).__name__ for stage in pipeline_model.stages]}")
# Log it correctly as a Spark model
with mlflow.start_run():
try:
# This should work for Spark ML models
mlflow.spark.log_model(pipeline_model, "spark_pipeline_model")
print("Successfully logged as Spark model")
except Exception as e:
print(f"Error logging as Spark model: {e}")
# If it fails, the model might have compatibility issues
If the above fails, try this alternative approach:
# Alternative: Log with explicit Spark ML flavor
import mlflow.pyfunc
class SparkModelWrapper(mlflow.pyfunc.PythonModel):
def __init__(self, spark_model):
self.spark_model = spark_model
def predict(self, context, model_input):
# Convert pandas DataFrame to Spark DataFrame if needed
if hasattr(model_input, 'toPandas'):
# Already a Spark DataFrame
return self.spark_model.transform(model_input)
else:
# Convert pandas to Spark DataFrame
spark_df = context.spark_session.createDataFrame(model_input)
result = self.spark_model.transform(spark_df)
return result.toPandas()
# Log the wrapped model
with mlflow.start_run():
mlflow.pyfunc.log_model(
"spark_pipeline_model",
python_model=SparkModelWrapper(pipeline_model),
artifacts={"model_path": model_path}
)
Passionate about hosting events and connecting people? Help us grow a vibrant local community—sign up today to get started!
Sign Up Now