cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Scaling issue for inference with a spark.mllib model

admo
New Contributor III

Hello,

I'm writing this because I have tried a lot of different directions to get a simple model inference working with no success.

Here is the outline of the job

# 1 - Load the base data (~1 billion lines of ~6 columns)
interaction = build_initial_df()
 
# 2 - Preprocessing of some of the data (~ 200 to 2000 columns) 
feature_df = add_metadata(interaction, feature_transformer_model)
 
# 3 - cross feature building and logisitic regression inference
model = PipelineModel.load(model_save_path)
prediction_per_user = model.transform(feature_df)
final_df = prediction_per_user.select(...)
 
# 4 - write the final result
write_df_to_snowflake(final_df)

This work reasonnably well when the size of the input data is around 100 smaller.

But fails on the full scale.

The size of the cluster is reasonable : up to 40 r5d.2xlarge giving a bit more than 2Tb in RAM

Execution problem :

Very intense ressource usage and a lot of disk spill

Question :

My understanding ia model inference is is similar to a map operation thus very fractionable and can work in parallel given a reasonable amount of compute.

How could I make it work given my resource budget ? What am I missing ?

Already tried :

1) I have already tried using a MLflow model udf but it doesn't work on list feature outputed by a previous feature pipeline model

2) I disabled some of the optimization of spark because it would run a stage that would fill a single executor disk or time out

3) Forcing some repartition of the dataframe to process the dataset in smaller chunks (it may be improved)

Thanks in advance

4 REPLIES 4

Hubert-Dudek
Esteemed Contributor III

It is hard to analyze without Spark UI and more detailed information, but anyway few tips:

  • look for data skews some partitions can be very big some small because of incorrect partitioning. You can use Spark UI to do that but also debug your code a bit (get getNumPartitions())
  • increase shuffle size spark.sql.shuffle.partitions default is 200 try bigger, I would go to 1000 at least even
  • increase size of driver to be 2 times bigger than executor (but to get optimal size please analyze load - in databricks on cluster tab look to Metrics there is Ganglia or even better integarte datadog with cluster)
  • make sure that everything run in distributed way, specially udf, you need to use vectorized pandas udfs so they will run on executors

admo
New Contributor III

Thanks for the help.

  • skew : I did not focus on this point, i'll have a look
  • spark.sql.shuffle.partitions : already done
  • bigger driver : already done
  • there is indeed an udf used for the prediction score retrieval, i will have a look too

Update :

  • skew : there was a large one but problem remained after the fix
  • udf : only a model udf exists which should already be optimized

admo
New Contributor III

Hello,

Thanks for checking, unfortunately no.

I believe that my core issue is about not being able to properly set the size of the processed chunks of data given my cluster memory.

My job looks like these few map operations :

prepare data <200 columns> => inference <40143 columns + model size> => final result <3columns>

I do understand that the inference part can be costly. But if processed per chunk, worst case scenario, it should be slow and not fail because of memory issues.

Any guidance on forcing this behavior would be welcome.

PS : Things have also become more difficult with loss of logging infos on the spark ui and ganglia. Would you know what the root cause may be ?

Capture d’écran 2022-03-22 à 16.34 

admo
New Contributor III

The philosophy for the job would be something like this in Scala :

feature_dataset.foreachPartition { block =>
   block.grouped(10000).foreach { chunk =>
   run_inference_and_write_to_db(chunk)
}

Would you know how to do this with pyspark and rdds ?

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you won’t want to miss the chance to attend and share knowledge.

If there isn’t a group near you, start one and help create a community that brings people together.

Request a New Group