cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Function in juypter notebook 12x faster than in python script

Karo
New Contributor

Hello dear community,

I wrote some ETL functions, e.g. to count the sessions until a conversion (see below). There for I load the data and then execute several small function for the feature generation.

When I run the function feat_session_unitl_conversion on a data set with approximately 5000 rows in a juypter notebook, it takes about 5 seconds. When I run it in a python script as a job (embedded in the ETL flow) using the same data set, it takes about 1 minute. I would like to understand why it is 12 x slower in the python script than in the jupyter notebook.
For both I use the same cluster with the following specifications:

  • 13.3 LTS (includes Apache Spark 3.4.1, Scala 2.12)
  • i3.xlarge
  • min workers 2, max workers 8
  • autoscaling enabled

Looking forward to the databricks swarm intelligence ๐Ÿ™‚

 

"""
Module to count sessions until conversion
"""
import logging
from pyspark.sql import DataFrame as SDF
from pyspark.sql import Window
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import col, lag, sum, when, broadcast
from configs import etl_configs
import src.helper_functions.spark_col_in_sdf as func_col_in_sdf
import src.helper_functions.spark_return_unique_values as func_return_unique_values

def feat_session_unitl_conversion(df_all: SDF, logger: logging):
    """
    Function to count sessions until conversions.
    If no conversion exist, the count starts at the first session until the conversion session is reached.
    The conversion session is excluded.
    If a conversion exist, the count starts again at the conversion session until 
    the next conversion session, excluding the last conversion session.
    Parameters:
        df (SDF): spark dataframe
        logger (logging): logger object
    Returns:
        df (SDF): spark dataframe with added column
    """
    logger.info("Count sessions until conversion")
    col_session = etl_configs.col_session_id.rsplit(".", 1)[-1]
    col_timestamp = etl_configs.col_timestamp
    col_user_id = etl_configs.col_user_id.rsplit(".", 1)[-1]
    col_conv_name = etl_configs.col_data_conv_name
    col_data_conv_name = etl_configs.col_data_conv_name.rsplit(".", 1)[-1]
    
    if func_col_in_sdf.col_in_sdf(df_all, col_conv_name) is False:
        raise ValueError(
            f"Column {col_conv_name} not in dataframe - is needed to  execuet feat_session_unitl_conversion"
        )
    conversion_types = func_return_unique_values.return_list_of_unique_values(df_all, col_data_conv_name)
    conversion_types.sort()

    window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
    # Calculate the cumulative count without duplicates
    data_sess = df_all.select(
        col_timestamp,
        col_conv_name,
        col_user_id,
        col_session,
        sum(
            when(
                col(col_session) != lag(col(col_session)).over(window_spec), 1
            ).otherwise(0)
        )
        .over(window_spec)
        .alias("cumcount"),
    )

    data_sess = data_sess.withColumn(
        "prev_conv_name", lag("conv_name").over(window_spec)
    )
    # Iterate over conversion types
    for conv_type in conversion_types:
        col_name = f"{etl_configs.col_count_sessions_until_conversion}_{conv_type}"
        df_conv_type = data_sess.filter(col(col_conv_name) == conv_type)
        df_conv_type = df_conv_type.dropDuplicates(subset=[col_user_id, col_session])
        # Define a window specification to order by the row
        window_spec = Window.partitionBy(col_user_id).orderBy(col_timestamp)
        # Use the lag function to get the previous value
        df_conv_type = df_conv_type.withColumn(
            "prev_value", lag("cumcount").over(window_spec)
        )
        # Calculate the rolling difference between the current value and the previous value
        df_conv_type = df_conv_type.withColumn(
            col_name, col("cumcount") - col("prev_value")
        )
        df_conv_type = df_conv_type.withColumn(
            col_name,
            when(col("prev_value").isNull(), col("cumcount")).otherwise(col(col_name)),
        )
        df_conv_type = df_conv_type.select(
            [col_session, col_user_id, f"sessions_until_conversion_{conv_type}"]
        )
        df_conv_type = df_conv_type.withColumn(
            col_name, col(col_name).cast(IntegerType())
        )
        # join data
        df_all = df_all.join(
            broadcast(df_conv_type), on=[col_session, col_user_id], how="left"
        )
    df_all = df_all.orderBy(col_user_id, col_timestamp)
    return df_all, conversion_types

 

 

0 REPLIES 0

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group