cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Function in juypter notebook 12x faster than in python script

Karo
New Contributor

Hello dear community,

I wrote some ETL functions, e.g. to count the sessions until a conversion (see below). There for I load the data and then execute several small function for the feature generation.

When I run the function feat_session_unitl_conversion on a data set with approximately 5000 rows in a juypter notebook, it takes about 5 seconds. When I run it in a python script as a job (embedded in the ETL flow) using the same data set, it takes about 1 minute. I would like to understand why it is 12 x slower in the python script than in the jupyter notebook.
For both I use the same cluster with the following specifications:

  • 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)
  • i3.xlarge
  • min workers 2, max workers 8
  • autoscaling enabled

Looking forward to the databricks swarm intelligence 🙂

 

"""
Module to count sessions until conversion
"""
import logging
from pyspark.sql import DataFrame as SDF
from pyspark.sql import Window
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lag, sum, when, broadcast
from configs import etl_configs
import src.helper_functions.spark_col_in_sdf as func_col_in_sdf
import src.helper_functions.spark_return_unique_values as func_return_unique_values

def feat_session_unitl_conversion(df_all: SDF, logger: logging):
    """
    Function to count sessions until conversions.
    If no conversion exist, the count starts at the first session until the conversion session is reached.
    The conversion session is excluded.
    If a conversion exist, the count starts again at the conversion session until 
    the next conversion session, excluding the last conversion session.
    Parameters:
        df (SDF): spark dataframe
        logger (logging): logger object
    Returns:
        df (SDF): spark dataframe with added column
    """
    logger.info("Count sessions until conversion")
    col_session = etl_configs.col_session_id.rsplit(".", 1)[-1]
    col_timestamp = etl_configs.col_timestamp
    col_user_id = etl_configs.col_user_id.rsplit(".", 1)[-1]
    col_conv_name = etl_configs.col_data_conv_name
    col_data_conv_name = etl_configs.col_data_conv_name.rsplit(".", 1)[-1]
    
    if func_col_in_sdf.col_in_sdf(df_all, col_conv_name) is False:
        raise ValueError(
            f"Column {col_conv_name} not in dataframe - is needed to  execuet feat_session_unitl_conversion"
        )
    conversion_types = func_return_unique_values.return_list_of_unique_values(df_all, col_data_conv_name)
    conversion_types.sort()

    window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
    # Calculate the cumulative count without duplicates
    data_sess = df_all.select(
        col_timestamp,
        col_conv_name,
        col_user_id,
        col_session,
        sum(
            when(
                col(col_session) != lag(col(col_session)).over(window_spec), 1
            ).otherwise(0)
        )
        .over(window_spec)
        .alias("cumcount"),
    )

    data_sess = data_sess.withColumn(
        "prev_conv_name", lag("conv_name").over(window_spec)
    )
    # Iterate over conversion types
    for conv_type in conversion_types:
        col_name = f"{etl_configs.col_count_sessions_until_conversion}_{conv_type}"
        df_conv_type = data_sess.filter(col(col_conv_name) == conv_type)
        df_conv_type = df_conv_type.dropDuplicates(subset=[col_user_id, col_session])
        # Define a window specification to order by the row
        window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
        # Use the lag function to get the previous value
        df_conv_type = df_conv_type.withColumn(
            "prev_value", lag("cumcount").over(window_spec)
        )
        # Calculate the rolling difference between the current value and the previous value
        df_conv_type = df_conv_type.withColumn(
            col_name, col("cumcount") - col("prev_value")
        )
        df_conv_type = df_conv_type.withColumn(
            col_name,
            when(col("prev_value").isNull(), col("cumcount")).otherwise(col(col_name)),
        )
        df_conv_type = df_conv_type.select(
            [col_session, col_user_id, f"sessions_until_conversion_{conv_type}"]
        )
        df_conv_type = df_conv_type.withColumn(
            col_name, col(col_name).cast(IntegerType())
        )
        # join data
        df_all = df_all.join(
            broadcast(df_conv_type), on=[col_session, col_user_id], how="left"
        )
    df_all = df_all.orderBy(col_user_id, col_timestamp)
    return df_all, conversion_types

 

 

0 REPLIES 0
Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!