I am trying to track the lineage of model and tables using the FeatureEngineeringClient. The table lineage shows the relevant tables and notebooks but the model lineage shows only the model. No notebook and tables. here is my code
fe = FeatureEngineeringClient()
def split_data():
spark = SparkSession.builder.getOrCreate()
catalog_name = config["catalog_name"]
gold_layer = config["gold_layer_name"]
silver_layer = config["silver_layer_name"]
user_item_table_name = config["user_item_table_name"]
ft_user_item_name = config["ft_user_item_name"]
SEED = 4
df_ratings = spark.table(f"{catalog_name}.{silver_layer}.{user_item_table_name}")
table_name = f"{catalog_name}.{gold_layer}.{ft_user_item_name}"
lookup_key = config["ft_user_item_pk"]
label = config["label_col"]
model_feature_lookups = [FeatureLookup(table_name=table_name, lookup_key=lookup_key)]
# fe.create_training_set looks up features in model_feature_lookups that match the primary key from df_ratings
fe_data = fe.create_training_set(df=df_ratings, feature_lookups=model_feature_lookups, label=label, exclude_columns=["rating_date_dayofmonth","rating_date_month"])
df_data = fe_data.load_df()
df_data = df_data.na.drop()
(df_train, df_test) = df_data.randomSplit([0.75,0.25],SEED)
print(f'full dataset: {df_data.count()}' ,f'Training: {df_train.count()}', f'test: {df_test.count()}\n')
return (fe_data, df_data, df_train, df_test)
with mlflow.start_run(run_name="ALS_final_model") as run:
fe_full_data, df_full_data, df_train, df_test = split_data()
als = ALS()
als.setMaxIter(MAX_ITER)\
.setSeed(SEED)\
.setRegParam(best_params["REG_PARAM"])\
.setUserCol(COL_USER)\
.setItemCol(COL_ITEM)\
.setRatingCol(COL_LABEL)\
.setRank(best_params["RANK"])
mlflow.log_param("MAX_ITER", MAX_ITER)
mlflow.log_param("RANK", best_params["RANK"])
mlflow.log_param("REG_PARAM", best_params["REG_PARAM"])
model = als.fit(df_full_data)
model.setColdStartStrategy('drop')
predictions = model.transform(df_full_data)
model_info = fe.log_model(model=model,
artifact_path = model_name,
flavor=mlflow.spark,
training_set=fe_full_data,
conda_env=mlflow.spark.get_default_conda_env(),
registered_model_name= f"{catalog_name}.feature_store.{model_name}"
)
evaluator = RegressionEvaluator(predictionCol=COL_PRED, labelCol=COL_LABEL)
rmse = evaluator.setMetricName("rmse").evaluate(predictions)
mlflow.log_metric('rmse', rmse)
Attached you see the screenshot of my lineage graphs for model and tables.
Any idea what could the problem?