Hey community members
I am new to Databricks and was building a simple DLT pipleine that loads data from S3 and runs an Isolation forest prediction to detect anomalies. The model has been stored in Model Registry. Here's the code for the pipeline:
@dlt.table
def trucklocation():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.option("cloudFiles.inferColumnTypes", True)
.load(f"{source}/trucklocation")
.select(
F.current_timestamp().alias("processing_time"),
"*"
)
)
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
@dlt.table
def velocity_predictions():
return (
dlt.read("trucklocation")
.withColumn('predictions', loaded_model_udf(struct(*map(col, ['velocity']))))
)
The pipeline errors out with the following error:
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 804.0 failed 4 times, most recent failure: Lost task 0.3 in stage 804.0 (TID 1285) (10.55.136.232 executor 0): org.apache.spark.api.python.PythonException: 'AttributeError: 'IsolationForest' object has no attribute 'n_features_''. Full traceback below:
Traceback (most recent call last):
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-a35783aa-d900-4f51-9233-f8eb37babc87/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 1293, in udf
os.kill(scoring_server_proc.pid, signal.SIGTERM)
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-a35783aa-d900-4f51-9233-f8eb37babc87/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 1080, in _predict_row_batch
result = predict_fn(pdf)
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-a35783aa-d900-4f51-9233-f8eb37babc87/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 1274, in batch_predict_fn
return loaded_model.predict(pdf)
File "/local_disk0/.ephemeral_nfs/envs/pythonEnv-a35783aa-d900-4f51-9233-f8eb37babc87/lib/python3.9/site-packages/mlflow/pyfunc/__init__.py", line 427, in predict
return self._predict_fn(data)
File "/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_iforest.py", line 314, in predict
is_inlier[self.decision_function(X) < 0] = -1
File "/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_iforest.py", line 347, in decision_function
return self.score_samples(X) - self.offset_
File "/databricks/python/lib/python3.9/site-packages/sklearn/ensemble/_iforest.py", line 379, in score_samples
if self.n_features_ != X.shape[1]:
AttributeError: 'IsolationForest' object has no attribute 'n_features_'
I tried running the prediction directly as well this way and it worked fine:
loaded_model_udf = mlflow.pyfunc.spark_udf(spark, model_uri=model_uri)
df_location = spark.read.format('json').option("inferSchema", "true").load(s3path)
df = df_location.withColumn('predictions', loaded_model_udf(struct(*map(col, ['velocity']))))
Any help on why the pipeline would fail is appreciated.