cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
cancel
Showing results for 
Search instead for 
Did you mean: 

What is the best way to ingest GCS data into Databricks and apply Anomaly Detection Model?

Pbarbosa154
New Contributor III

I recently started exploring the field of Data Engineering and came across some difficulties. I have a bucket in GCS with millions of parquet files and I want to create an Anomaly Detection model with them. I was trying to ingest that data into Databricks and store it all in a Delta Table, using Autoloader, and then using H2O Sparkling Water for the Anomaly Detection model. What do you recommend?

I have run into some memory issues when I read the Delta Table into a Pyspark Dataframe and try to pass it to an H2O Frame.

I'm using a cluster with 30 GB of memory and 4 cores and the Dataframe has shape (30448674, 213). The error I'm getting is the following: The spark driver has stopped unexpectedly and is restarting. Your notebook will be reattached automatically. Through an analysis in Ganglia UI my driver graph is always in red when executing the asH2OFrame() function.

This is the code I am using:

# Imports
 
import h2o
from pysparkling import *
from pyspark.sql import SparkSession
 
# Creates an H2OContext Object.
 
hc = H2OContext.getOrCreate(spark)
 
# Read the Data from Delta Table to PySpark Dataframe
 
df = spark.read.format("delta").load("path/to/delta_table")
 
# Converting the PySpark Dataframe to an H2OFrame
 
h2o_df = hc.asH2OFrame(df)
 
# H2O Model for Anomaly Detection
 
isolation_model = H2OIsolationForestEstimator(model_id, ntrees, seed)
 
isolation_model.train(training_frame = h2o_df)
 
# Making predictions
 
predictions = isolation_model.predict(h2o_df)

2 REPLIES 2

Anonymous
Not applicable

@Pedro Barbosa​ :

It seems like you are running out of memory when trying to convert the PySpark dataframe to an H2O frame. One possible approach to solve this issue is to partition the PySpark dataframe before converting it to an H2O frame.

You can use the repartition() method of PySpark dataframes to split your dataframe into smaller partitions and then convert each partition into an H2O frame separately. You can then concatenate the resulting H2O frames to create the final H2O frame.

Here is an example code snippet that shows how to partition the PySpark dataframe and convert each partition to an H2O frame:

# Imports
 
import h2o
from pysparkling import *
from pyspark.sql import SparkSession
 
# Creates an H2OContext Object.
hc = H2OContext.getOrCreate(spark)
 
# Read the Data from Delta Table to PySpark Dataframe
df = spark.read.format("delta").load("path/to/delta_table")
 
# Partition the dataframe
num_partitions = 10
df = df.repartition(num_partitions)
 
# Convert each partition to an H2O frame
h2o_frames = []
for i in range(num_partitions):
    partition_df = df.where(df.partition_id() == i)
    h2o_frame = hc.asH2OFrame(partition_df)
    h2o_frames.append(h2o_frame)
 
# Concatenate the H2O frames to create the final H2O frame
h2o_df = h2o.frames.concat(h2o_frames)
 
# H2O Model for Anomaly Detection
isolation_model = H2OIsolationForestEstimator(model_id, ntrees, seed)
isolation_model.train(training_frame = h2o_df)
 
# Making predictions
predictions = isolation_model.predict(h2o_df)

This approach should help you avoid running out of memory when converting large PySpark dataframes to H2O frames.

I already tried that approach and got the same error. Should I consider increasing the size and cores of my cluster? Or should I be able to achieve my goal with this settings?

EDIT: Also when running your code i get the AttributeError: 'DataFrame' object has no attribute 'partition_id'