cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

Scaling issue for inference with a spark.mllib model

admo
New Contributor III

Hello,

I'm writing this because I have tried a lot of different directions to get a simple model inference working with no success.

Here is the outline of the job

# 1 - Load the base data (~1 billion lines of ~6 columns)
interaction = build_initial_df()
 
# 2 - Preprocessing of some of the data (~ 200 to 2000 columns) 
feature_df = add_metadata(interaction, feature_transformer_model)
 
# 3 - cross feature building and logisitic regression inference
model = PipelineModel.load(model_save_path)
prediction_per_user = model.transform(feature_df)
final_df = prediction_per_user.select(...)
 
# 4 - write the final result
write_df_to_snowflake(final_df)

This work reasonnably well when the size of the input data is around 100 smaller.

But fails on the full scale.

The size of the cluster is reasonable : up to 40 r5d.2xlarge giving a bit more than 2Tb in RAM

Execution problem :

Very intense ressource usage and a lot of disk spill

Question :

My understanding ia model inference is is similar to a map operation thus very fractionable and can work in parallel given a reasonable amount of compute.

How could I make it work given my resource budget ? What am I missing ?

Already tried :

1) I have already tried using a MLflow model udf but it doesn't work on list feature outputed by a previous feature pipeline model

2) I disabled some of the optimization of spark because it would run a stage that would fill a single executor disk or time out

3) Forcing some repartition of the dataframe to process the dataset in smaller chunks (it may be improved)

Thanks in advance

5 REPLIES 5

Hubert-Dudek
Esteemed Contributor III

It is hard to analyze without Spark UI and more detailed information, but anyway few tips:

  • look for data skews some partitions can be very big some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions())
  • increase shuffle size spark.sql.shuffle.partitions default is 200 try bigger, I would go to 1000 at least even
  • increase size of driver to be 2 times bigger than executor (but to get optimal size please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integarte datadog with cluster)
  • make sure that everything run in distributed way, specially udf, you need to use vectorized pandas udfs so they will run on executors

admo
New Contributor III

Thanks for the help.

  • skew : I did not focus on this point, i'll have a look
  • spark.sql.shuffle.partitions : already done
  • bigger driver : already done
  • there is indeed an udf used for the prediction score retrieval, i will have a look too

Update :

  • skew : there was a large one but problem remained after the fix
  • udf : only a model udf exists which should already be optimized

Kaniz
Community Manager
Community Manager

Hi @Adrien Mo​ , How are you? Were you able to resolve the problem?

admo
New Contributor III

Hello,

Thanks for checking, unfortunately no.

I believe that my core issue is about not being able to properly set the size of the processed chunks of data given my cluster memory.

My job looks like these few map operations :

prepare data <200 columns> => inference <40143 columns + model size> => final result <3columns>

I do understand that the inference part can be costly. But if processed per chunk, worst case scenario, it should be slow and not fail because of memory issues.

Any guidance on forcing this behavior would be welcome.

PS : Things have also become more difficult with loss of logging infos on the spark ui and ganglia. Would you know what the root cause may be ?

Capture d’écran 2022-03-22 à 16.34 

admo
New Contributor III

The philosophy for the job would be something like this in Scala :

feature_dataset.foreachPartition { block =>
   block.grouped(10000).foreach { chunk =>
   run_inference_and_write_to_db(chunk)
}

Would you know how to do this with pyspark and rdds ?

Welcome to Databricks Community: Lets learn, network and celebrate together

Join our fast-growing data practitioner and expert community of 80K+ members, ready to discover, help and collaborate together while making meaningful connections. 

Click here to register and join today! 

Engage in exciting technical discussions, join a group with your peers and meet our Featured Members.