cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipeline MLFlow UDF error

Feather
New Contributor III

I am running this notebook via the dlt pipeline in preview mode.

everything works up until the predictions table that should be created with a registered model inferencing the gold table.

Feather_0-1699311273694.png

 This is the  error: com databricks spark safespark UDFException: INVALID_ARGUMENT: No module named 'importlib_metadata'
 
# Databricks notebook source
# MAGIC %pip install mlflow
# MAGIC %pip install importlib_metadata

# COMMAND ----------

import mlflow
import importlib_metadata
model_uri = f"models:/soybeans_volatility/1"

# create spark user-defined function for model prediction.
predict = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double", env_manager='virtualenv')


# COMMAND ----------

import dlt
from pyspark.sql.functions import avg, max, min, col, lag, count, when, struct, from_unixtime, unix_timestamp
from pyspark.sql.window import Window

path_to_uc_external_location = "s3://gfy-databricks-storage/data/barchart/soybeans/"

@dlt.table(name="soybeans_bronze", table_properties={"quality": "bronze"})
def table_name():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(f"{path_to_uc_external_location}")
)


@dlt.table(name="soybeans_silver", table_properties={"quality": "silver"})
def create_silver_table():
df = dlt.read("soybeans_bronze")
 
cleaned_df = df.drop("_rescued_data").filter(col("close").isNotNull())

formatted_df = cleaned_df.withColumn(
"tradeTimestamp",
from_unixtime(
unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ssXXX"),
"yyyy-MM-dd HH:mm:ss.SSS"
)
)

return formatted_df

@dlt.table(name="soybeans_gold", table_properties={"quality": "gold"})
def create_gold_table():
df_silver = dlt.read("soybeans_silver")
 
# Compute a 7-day rolling average of the close price
windowSpec = Window.partitionBy("symbol").orderBy("tradeTimestamp").rowsBetween(-6, 0)
avg_price = avg(col("close").cast("double")).over(windowSpec)
 
# Compute daily volatility
daily_volatility = (max(col("high").cast("double")).over(windowSpec) -
min(col("low").cast("double")).over(windowSpec))
 
# Extract previous day's volume
lag_window = Window.partitionBy("symbol").orderBy("tradeTimestamp").rowsBetween(-1, -1)
prev_day_volume = lag(col("volume"), 1, 0).over(lag_window)
 
df_gold = df_silver.withColumn("7_day_avg_close", avg_price) \
.withColumn("daily_volatility", daily_volatility) \
.withColumn("prev_day_volume", prev_day_volume)
 
return df_gold


@dlt.table(
comment="DLT for predictions scored by soybeans_volatility model based on models.soybeans.soybeans_gold Delta table.",
name="soybeans_gold_preds",
table_properties={
"quality": "gold"
}
)
def soybeans_volatility_predictions():
input_dlt_table_name = "soybeans_gold"
input_delta_live_table = dlt.read(input_dlt_table_name)

input_dlt_table_columns = input_delta_live_table.columns

predictions_df = input_delta_live_table.withColumn('prediction', predict(struct(*input_dlt_table_columns)))

return predictions_df


I've tried everything, I've removed the virtualenv and ran the pipeline in current mode (non preview) but no luck.
 
Feather_1-1699311414386.png

 

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Data_Interlaced 

I just encountered this issue as well. I compared the libraries installed in my non-ML and ML clusters with pip freeze and found version discrepancies between them. 

In my case, I resolved it by installing the library with version in the ML cluster. In particular, I needed the followings:

importlib-metadata==4.11.3
zipp==3.8.0

Hope this help.

View solution in original post

12 REPLIES 12

Data_Interlaced
New Contributor III

I have the same problem. A simple python UDF function in the DLT. Says it can't find importlib_metadata

what does your code look like? i am going to attend office hours

UDF registry file:

# Databricks notebook source
# MAGIC %pip install mlflow

# COMMAND ----------

import dlt

# COMMAND ----------

import mlflow
from pyspark.sql.functions import struct, col
logged_model = 'runs:/xxxx/model_overstort_all_devices'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='double')

spark.udf.register("detect_anomaly", loaded_model)

Actual pipeline code:
CREATE OR REFRESH STREAMING LIVE TABLE iot_overstort_predict
COMMENT "Predicting anomalies in measurement data using the Isolation Forest model"
TBLPROPERTIES ("quality" = "gold")
AS SELECT 
    utc_timestamp,
    measurement_value_float,
    topic, 
    YEAR(utc_timestamp) as year, 
    MONTH(utc_timestamp) as month, 
    DAY(utc_timestamp) as day, 
    hour(utc_timestamp) as hour,
    detect_anomaly(measurement_value_float)
FROM STREAM(aqf_tda.test.xxx)

 

Actual pipeline:

CREATE OR REFRESH STREAMING LIVE TABLE iot_overstort_predict
COMMENT "Predicting anomalies in measurement data using the Isolation Forest model"
TBLPROPERTIES ("quality" = "gold")
AS SELECT
    utc_timestamp,
    measurement_value_float,
    topic,
    YEAR(utc_timestamp) as year,
    MONTH(utc_timestamp) as month,
    DAY(utc_timestamp) as day,
    hour(utc_timestamp) as hour,
    detect_anomaly(measurement_value_float)
FROM STREAM(aqf_tda.test.iot_overstort_sample_v1)



To be clear, the UDF registry file and the SQL pipeline code are 2 separate files of the DLT

Hi Kaniz,

!pip show import importlib_metadata shows the info of the module as expected. 
I can see that even during running a batch inference or structured streaming I get the same error.

As soon as I use MLFlow on a non-ML runtime cluster the error shows. Maybe there is an incompatibility?

I can't use an ML runtime because I am using a Unity Catalog enabled cluster. This is a shared cluster capable of accessing the data I need. ML runitme on UC enabled clusters is not supported.

During training of the model, I get the following warning though: Maybe this is related?

warnings.warn(
2023/11/14 08:59:54 WARNING mlflow.models.model: Logging model metadata to the tracking server has failed, possibly due older server version. The model artifacts have been logged successfully under dbfs:/databricks/mlflow-tracking/f5f6e63a202844138b5fad3fddd0007a/e570c1e2b3ab4cb9b220046a5e5c3a64/artifacts. In addition to exporting model artifacts, MLflow clients 1.7.0 and above attempt to record model metadata to the tracking store. If logging to a mlflow server via REST, consider upgrading the server version to MLflow 1.7.0 or above. Set logging level to DEBUG via `logging.getLogger("mlflow").setLevel(logging.DEBUG)` to see the full traceback.
/databricks/python/lib/python3.9/site-packages/sklearn/base.py:450: UserWarning: X does not have valid feature names, but IsolationForest was fitted with feature names

 

Hi @Data_Interlaced 

I just encountered this issue as well. I compared the libraries installed in my non-ML and ML clusters with pip freeze and found version discrepancies between them. 

In my case, I resolved it by installing the library with version in the ML cluster. In particular, I needed the followings:

importlib-metadata==4.11.3
zipp==3.8.0

Hope this help.

@BarryC this has worked. 

%pip install importlib-metadata==4.11.3
%pip instal zipp=3.8.0

Adding this at the start of your DLT UDF register notebook will solve the issue.

Databricks is advocating in all docs and tutorials to use DLT for ML inference, but this is a standard incompatibility inherent to the setup. I hope Databricks will take action and resolve this asap. 


Kind regards,

Data Interlaced ltd

 

Feather
New Contributor III

Hi @Retired_mod , when running the code on the dlt pipeline, we don't get to choose what cluster the dlt pipeline will use. The dlt pipeline just stands up a random temporary cluster that uses for the dlt run. Apologies if technically I am off but from a high level that's the way I see the dlt pipeline working. 

The only place I see where the dlt pipeline can be configured with additional libraries is within the dlt pipeline code itself by installing with the syntax as# MAGIC %pip install mlflow , etc... I've tried to resolve the error also by running the code with the added # MAGIC %pip install importlib_metadata (as shown in my code on my post) but no luck.

as the documentation states, installing of the mlflow library has to be done within the notebook of the dlt pipeline run.

Feather_0-1700022248426.png

 

BarryC
New Contributor III

Hi @Feather 

Have you also tried specifying the version of the library as well?

Feather
New Contributor III

Hi @BarryC ,

Your solution worked! I will vote your solution up.

Although I did get a new error (below) . I think this is an error of mine. I might have to open a new question for it.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 509.0 failed 4 times, most recent failure: Lost task 0.3 in stage 509.0 (TID 830) (10.186.194.4 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function udf(named_struct(close, close#46263, high, high#46264, low, low#46265, open, open#46266, symbol, symbol#46267, timestamp, timestamp#46268, tradingDay, tradingDay#46269, volume, volume#46270, yr, yr#46271, month, month#46272, dayy, dayy#46273, tradeTimestamp, tradeTimestamp#46274, ... 6 more fields)) failed. 
== Error ==
BrokenPipeError: [Errno 32] Broken pipe

 

Thanks!

jsc
New Contributor II

Were you able to fix this error? I am getting the same error.

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group