Hello dear community,
I wrote some ETL functions, e.g. to count the sessions until a conversion (see below). There for I load the data and then execute several small function for the feature generation.
When I run the function feat_session_unitl_conversion on a data set with approximately 5000 rows in a juypter notebook, it takes about 5 seconds. When I run it in a python script as a job (embedded in the ETL flow) using the same data set, it takes about 1 minute. I would like to understand why it is 12 x slower in the python script than in the jupyter notebook.
For both I use the same cluster with the following specifications:
- 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)
- i3.xlarge
- min workers 2, max workers 8
- autoscaling enabled
Looking forward to the databricks swarm intelligence ๐
"""
Module to count sessions until conversion
"""
import logging
from pyspark.sql import DataFrame as SDF
from pyspark.sql import Window
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lag, sum, when, broadcast
from configs import etl_configs
import src.helper_functions.spark_col_in_sdf as func_col_in_sdf
import src.helper_functions.spark_return_unique_values as func_return_unique_values
def feat_session_unitl_conversion(df_all: SDF, logger: logging):
"""
Function to count sessions until conversions.
If no conversion exist, the count starts at the first session until the conversion session is reached.
The conversion session is excluded.
If a conversion exist, the count starts again at the conversion session until
the next conversion session, excluding the last conversion session.
Parameters:
df (SDF): spark dataframe
logger (logging): logger object
Returns:
df (SDF): spark dataframe with added column
"""
logger.info("Count sessions until conversion")
col_session = etl_configs.col_session_id.rsplit(".", 1)[-1]
col_timestamp = etl_configs.col_timestamp
col_user_id = etl_configs.col_user_id.rsplit(".", 1)[-1]
col_conv_name = etl_configs.col_data_conv_name
col_data_conv_name = etl_configs.col_data_conv_name.rsplit(".", 1)[-1]
if func_col_in_sdf.col_in_sdf(df_all, col_conv_name) is False:
raise ValueError(
f"Column {col_conv_name} not in dataframe - is needed to execuet feat_session_unitl_conversion"
)
conversion_types = func_return_unique_values.return_list_of_unique_values(df_all, col_data_conv_name)
conversion_types.sort()
window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
# Calculate the cumulative count without duplicates
data_sess = df_all.select(
col_timestamp,
col_conv_name,
col_user_id,
col_session,
sum(
when(
col(col_session) != lag(col(col_session)).over(window_spec), 1
).otherwise(0)
)
.over(window_spec)
.alias("cumcount"),
)
data_sess = data_sess.withColumn(
"prev_conv_name", lag("conv_name").over(window_spec)
)
# Iterate over conversion types
for conv_type in conversion_types:
col_name = f"{etl_configs.col_count_sessions_until_conversion}_{conv_type}"
df_conv_type = data_sess.filter(col(col_conv_name) == conv_type)
df_conv_type = df_conv_type.dropDuplicates(subset=[col_user_id, col_session])
# Define a window specification to order by the row
window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
# Use the lag function to get the previous value
df_conv_type = df_conv_type.withColumn(
"prev_value", lag("cumcount").over(window_spec)
)
# Calculate the rolling difference between the current value and the previous value
df_conv_type = df_conv_type.withColumn(
col_name, col("cumcount") - col("prev_value")
)
df_conv_type = df_conv_type.withColumn(
col_name,
when(col("prev_value").isNull(), col("cumcount")).otherwise(col(col_name)),
)
df_conv_type = df_conv_type.select(
[col_session, col_user_id, f"sessions_until_conversion_{conv_type}"]
)
df_conv_type = df_conv_type.withColumn(
col_name, col(col_name).cast(IntegerType())
)
# join data
df_all = df_all.join(
broadcast(df_conv_type), on=[col_session, col_user_id], how="left"
)
df_all = df_all.orderBy(col_user_id, col_timestamp)
return df_all, conversion_types