03-17-2022 02:11 AM
Hello,
I'm writing this because I have tried a lot of different directions to get a simple model inference working with no success.
Here is the outline of the job
# 1 - Load the base data (~1 billion lines of ~6 columns)
interaction = build_initial_df()
# 2 - Preprocessing of some of the data (~ 200 to 2000 columns)
feature_df = add_metadata(interaction, feature_transformer_model)
# 3 - cross feature building and logisitic regression inference
model = PipelineModel.load(model_save_path)
prediction_per_user = model.transform(feature_df)
final_df = prediction_per_user.select(...)
# 4 - write the final result
write_df_to_snowflake(final_df)
This work reasonnably well when the size of the input data is around 100 smaller.
But fails on the full scale.
The size of the cluster is reasonable : up to 40 r5d.2xlarge giving a bit more than 2Tb in RAM
Execution problem :
Very intense ressource usage and a lot of disk spill
Question :
My understanding ia model inference is is similar to a map operation thus very fractionable and can work in parallel given a reasonable amount of compute.
How could I make it work given my resource budget ? What am I missing ?
Already tried :
1) I have already tried using a MLflow model udf but it doesn't work on list feature outputed by a previous feature pipeline model
2) I disabled some of the optimization of spark because it would run a stage that would fill a single executor disk or time out
3) Forcing some repartition of the dataframe to process the dataset in smaller chunks (it may be improved)
Thanks in advance
03-17-2022 03:42 AM
It is hard to analyze without Spark UI and more detailed information, but anyway few tips:
03-17-2022 05:28 AM
Thanks for the help.
Update :
03-22-2022 08:04 AM
Hi @Adrien Mo , How are you? Were you able to resolve the problem?
03-22-2022 08:53 AM
Hello,
Thanks for checking, unfortunately no.
I believe that my core issue is about not being able to properly set the size of the processed chunks of data given my cluster memory.
My job looks like these few map operations :
prepare data <200 columns> => inference <40143 columns + model size> => final result <3columns>
I do understand that the inference part can be costly. But if processed per chunk, worst case scenario, it should be slow and not fail because of memory issues.
Any guidance on forcing this behavior would be welcome.
PS : Things have also become more difficult with loss of logging infos on the spark ui and ganglia. Would you know what the root cause may be ?
03-22-2022 09:01 AM
The philosophy for the job would be something like this in Scala :
feature_dataset.foreachPartition { block =>
block.grouped(10000).foreach { chunk =>
run_inference_and_write_to_db(chunk)
}
Would you know how to do this with pyspark and rdds ?
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.
If there isn’t a group near you, start one and help create a community that brings people together.
Request a New Group