cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

DLT pipeline MLFlow UDF error

Feather
New Contributor III

I am running this notebook via the dlt pipeline in preview mode.

everything works up until the predictions table that should be created with a registered model inferencing the gold table.

Feather_0-1699311273694.png

 This is the  error: com databricks spark safespark UDFException: INVALID_ARGUMENT: No module named 'importlib_metadata'
 
# Databricks notebook source
# MAGIC %pip install mlflow
# MAGIC %pip install importlib_metadata

# COMMAND ----------

import mlflow
import importlib_metadata
model_uri = f"models:/soybeans_volatility/1"

# create spark user-defined function for model prediction.
predict = mlflow.pyfunc.spark_udf(spark, model_uri, result_type="double", env_manager='virtualenv')


# COMMAND ----------

import dlt
from pyspark.sql.functions import avg, max, min, col, lag, count, when, struct, from_unixtime, unix_timestamp
from pyspark.sql.window import Window

path_to_uc_external_location = "s3://gfy-databricks-storage/data/barchart/soybeans/"

@dlt.table(name="soybeans_bronze", table_properties={"quality": "bronze"})
def table_name():
return (
spark.readStream
.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(f"{path_to_uc_external_location}")
)


@dlt.table(name="soybeans_silver", table_properties={"quality": "silver"})
def create_silver_table():
df = dlt.read("soybeans_bronze")
 
cleaned_df = df.drop("_rescued_data").filter(col("close").isNotNull())

formatted_df = cleaned_df.withColumn(
"tradeTimestamp",
from_unixtime(
unix_timestamp(col("timestamp"), "yyyy-MM-dd'T'HH:mm:ssXXX"),
"yyyy-MM-dd HH:mm:ss.SSS"
)
)

return formatted_df

@dlt.table(name="soybeans_gold", table_properties={"quality": "gold"})
def create_gold_table():
df_silver = dlt.read("soybeans_silver")
 
# Compute a 7-day rolling average of the close price
windowSpec = Window.partitionBy("symbol").orderBy("tradeTimestamp").rowsBetween(-6, 0)
avg_price = avg(col("close").cast("double")).over(windowSpec)
 
# Compute daily volatility
daily_volatility = (max(col("high").cast("double")).over(windowSpec) -
min(col("low").cast("double")).over(windowSpec))
 
# Extract previous day's volume
lag_window = Window.partitionBy("symbol").orderBy("tradeTimestamp").rowsBetween(-1, -1)
prev_day_volume = lag(col("volume"), 1, 0).over(lag_window)
 
df_gold = df_silver.withColumn("7_day_avg_close", avg_price) \
.withColumn("daily_volatility", daily_volatility) \
.withColumn("prev_day_volume", prev_day_volume)
 
return df_gold


@dlt.table(
comment="DLT for predictions scored by soybeans_volatility model based on models.soybeans.soybeans_gold Delta table.",
name="soybeans_gold_preds",
table_properties={
"quality": "gold"
}
)
def soybeans_volatility_predictions():
input_dlt_table_name = "soybeans_gold"
input_delta_live_table = dlt.read(input_dlt_table_name)

input_dlt_table_columns = input_delta_live_table.columns

predictions_df = input_delta_live_table.withColumn('prediction', predict(struct(*input_dlt_table_columns)))

return predictions_df


I've tried everything, I've removed the virtualenv and ran the pipeline in current mode (non preview) but no luck.
 
Feather_1-1699311414386.png

 

1 ACCEPTED SOLUTION

Accepted Solutions

Hi @Data_Interlaced 

I just encountered this issue as well. I compared the libraries installed in my non-ML and ML clusters with pip freeze and found version discrepancies between them. 

In my case, I resolved it by installing the library with version in the ML cluster. In particular, I needed the followings:

importlib-metadata==4.11.3
zipp==3.8.0

Hope this help.

View solution in original post

14 REPLIES 14

Data_Interlaced
New Contributor III

I have the same problem. A simple python UDF function in the DLT. Says it can't find importlib_metadata

what does your code look like? i am going to attend office hours

UDF registry file:

# Databricks notebook source
# MAGIC %pip install mlflow

# COMMAND ----------

import dlt

# COMMAND ----------

import mlflow
from pyspark.sql.functions import struct, col
logged_model = 'runs:/xxxx/model_overstort_all_devices'

# Load model as a Spark UDF. Override result_type if the model does not return double values.
loaded_model = mlflow.pyfunc.spark_udf(spark, model_uri=logged_model, result_type='double')

spark.udf.register("detect_anomaly", loaded_model)

Actual pipeline code:
CREATE OR REFRESH STREAMING LIVE TABLE iot_overstort_predict
COMMENT "Predicting anomalies in measurement data using the Isolation Forest model"
TBLPROPERTIES ("quality" = "gold")
AS SELECT 
    utc_timestamp,
    measurement_value_float,
    topic, 
    YEAR(utc_timestamp) as year, 
    MONTH(utc_timestamp) as month, 
    DAY(utc_timestamp) as day, 
    hour(utc_timestamp) as hour,
    detect_anomaly(measurement_value_float)
FROM STREAM(aqf_tda.test.xxx)

 

Actual pipeline:

CREATE OR REFRESH STREAMING LIVE TABLE iot_overstort_predict
COMMENT "Predicting anomalies in measurement data using the Isolation Forest model"
TBLPROPERTIES ("quality" = "gold")
AS SELECT
    utc_timestamp,
    measurement_value_float,
    topic,
    YEAR(utc_timestamp) as year,
    MONTH(utc_timestamp) as month,
    DAY(utc_timestamp) as day,
    hour(utc_timestamp) as hour,
    detect_anomaly(measurement_value_float)
FROM STREAM(aqf_tda.test.iot_overstort_sample_v1)



To be clear, the UDF registry file and the SQL pipeline code are 2 separate files of the DLT

Kaniz_Fatma
Community Manager
Community Manager

Hi @Feather

You can check if a module is installed in the correct environment by running the following command in a new cell:

!pip show <module-name>

This command will display information about the module, including its version, location, and other details. If the module is installed in the correct environment, you should see its details displayed in the output. If the module is not installed, you will see an error message.

Alternatively, you can also use the following command to list all the installed modules in your environment:

!pip freeze

This command will display a list of all the installed modules in your environment and their versions. You can search for the module you are looking for in this list to see if it is installed.

I hope this helps! Let me know if you have any other questions.

Hi Kaniz,

!pip show import importlib_metadata shows the info of the module as expected. 
I can see that even during running a batch inference or structured streaming I get the same error.

As soon as I use MLFlow on a non-ML runtime cluster the error shows. Maybe there is an incompatibility?

Kaniz_Fatma
Community Manager
Community Manager

Hi @Data_Interlaced

This could be due to an incompatibility issue. I found a [Databricks community post]1 that suggests that this issue can occur when running the code with a non-ML cluster, even though you install ML libraries. Using an ML cluster will resolve this issue.

If you are not using Databricks, you can try checking if your runtime environment is compatible with the version of MLflow you are using. You can also try installing the required libraries into the existing cluster.

I hope this helps! Let me know if you have any other questions.

I can't use an ML runtime because I am using a Unity Catalog enabled cluster. This is a shared cluster capable of accessing the data I need. ML runitme on UC enabled clusters is not supported.

During training of the model, I get the following warning though: Maybe this is related?

warnings.warn(
2023/11/14 08:59:54 WARNING mlflow.models.model: Logging model metadata to the tracking server has failed, possibly due older server version. The model artifacts have been logged successfully under dbfs:/databricks/mlflow-tracking/f5f6e63a202844138b5fad3fddd0007a/e570c1e2b3ab4cb9b220046a5e5c3a64/artifacts. In addition to exporting model artifacts, MLflow clients 1.7.0 and above attempt to record model metadata to the tracking store. If logging to a mlflow server via REST, consider upgrading the server version to MLflow 1.7.0 or above. Set logging level to DEBUG via `logging.getLogger("mlflow").setLevel(logging.DEBUG)` to see the full traceback.
/databricks/python/lib/python3.9/site-packages/sklearn/base.py:450: UserWarning: X does not have valid feature names, but IsolationForest was fitted with feature names

 

Hi @Data_Interlaced 

I just encountered this issue as well. I compared the libraries installed in my non-ML and ML clusters with pip freeze and found version discrepancies between them. 

In my case, I resolved it by installing the library with version in the ML cluster. In particular, I needed the followings:

importlib-metadata==4.11.3
zipp==3.8.0

Hope this help.

@BarryC this has worked. 

%pip install importlib-metadata==4.11.3
%pip instal zipp=3.8.0

Adding this at the start of your DLT UDF register notebook will solve the issue.

Databricks is advocating in all docs and tutorials to use DLT for ML inference, but this is a standard incompatibility inherent to the setup. I hope Databricks will take action and resolve this asap. 


Kind regards,

Data Interlaced ltd

 

Feather
New Contributor III

Hi @Kaniz_Fatma , when running the code on the dlt pipeline, we don't get to choose what cluster the dlt pipeline will use. The dlt pipeline just stands up a random temporary cluster that uses for the dlt run. Apologies if technically I am off but from a high level that's the way I see the dlt pipeline working. 

The only place I see where the dlt pipeline can be configured with additional libraries is within the dlt pipeline code itself by installing with the syntax as# MAGIC %pip install mlflow , etc... I've tried to resolve the error also by running the code with the added # MAGIC %pip install importlib_metadata (as shown in my code on my post) but no luck.

as the documentation states, installing of the mlflow library has to be done within the notebook of the dlt pipeline run.

Feather_0-1700022248426.png

 

BarryC
New Contributor III

Hi @Feather 

Have you also tried specifying the version of the library as well?

Feather
New Contributor III

Hi @BarryC ,

Your solution worked! I will vote your solution up.

Although I did get a new error (below) . I think this is an error of mine. I might have to open a new question for it.

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 509.0 failed 4 times, most recent failure: Lost task 0.3 in stage 509.0 (TID 830) (10.186.194.4 executor 0): org.apache.spark.SparkRuntimeException: [UDF_USER_CODE_ERROR.GENERIC] Execution of function udf(named_struct(close, close#46263, high, high#46264, low, low#46265, open, open#46266, symbol, symbol#46267, timestamp, timestamp#46268, tradingDay, tradingDay#46269, volume, volume#46270, yr, yr#46271, month, month#46272, dayy, dayy#46273, tradeTimestamp, tradeTimestamp#46274, ... 6 more fields)) failed. 
== Error ==
BrokenPipeError: [Errno 32] Broken pipe

 

Thanks!

jsc
New Contributor II

Were you able to fix this error? I am getting the same error.

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!