cancel
Showing results for 
Search instead for 
Did you mean: 
Data Engineering
Join discussions on data engineering best practices, architectures, and optimization strategies within the Databricks Community. Exchange insights and solutions with fellow data engineers.
cancel
Showing results for 
Search instead for 
Did you mean: 

Pandas_UDF not working on shared access mode but works on personal cluster

Awoke101
New Contributor II

 

The "dense_vector" column does not output on show(). Instead I get the error below. Any idea why it doesn't work on the shared access mode? Any alternatives?

 

from fastembed import TextEmbedding, SparseTextEmbedding
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType
import pandas as pd
from pyspark.sql.functions import col

@pandas_udf(ArrayType(FloatType()))
def generate_dense_embeddings(contents: pd.Series) ->  pd.Series:
    small_embedding_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5", cache_dir="/tmp/local_cache/")
    dense_embeddings_list = small_embedding_model.embed(contents)
    return pd.Series(list(dense_embeddings_list))

df=df.limit(50)
df.show(10)
embeddings = df.withColumn("dense_vector", generate_dense_embeddings(col("content")))
embeddings.show(10)
Py4JJavaError: An error occurred while calling o474.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 4 times, most recent failure: Lost task 0.3 in stage 21.0 (TID 28) (172.16.2.140 executor 0): org.apache.spark.SparkRuntimeException: [UDF_ERROR.ENV_LOST] Execution of function generate_dense_embeddings(content#73) failed  - the execution environment was lost during execution. This may be caused by the code crashing or the process exiting prematurely.

 

1 ACCEPTED SOLUTION

Accepted Solutions

@jacovangelder thanks but the error was solved by adding this in my UDF.

user = os.environ.get("USER")

 

View solution in original post

8 REPLIES 8

jacovangelder
Contributor III

Not 100% sure but I'm guessing it is because of the cache_dir. The Shared access clusters are meant for UC and should point to UC volumes instead of local paths. Can you try to change it to a UC volume? 

Got this error along with the one above, even though the model cached in the UC Volume.

from fastembed import TextEmbedding, SparseTextEmbedding
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType
import pandas as pd

@pandas_udf(ArrayType(FloatType()))
def generate_dense_embeddings(contents: pd.Series) ->  pd.Series:
    small_embedding_model = TextEmbedding(model_name='BAAI/bge-base-en',cache_dir="/Volumes/qdrant_cache/default/model_cache/")
    dense_embeddings_list = small_embedding_model.embed(contents)
    return pd.Series(list(dense_embeddings_list))

from pyspark.sql.functions import col

df=df.limit(50)
df.show(10)
embeddings = df.withColumn("dense_vector", generate_dense_embeddings(col("content")))
embeddings.show(10)
Write not supported
Files in Repos are currently read-only. Please try writing to /tmp/<filename>. Alternatively, contact your Databricks representative to enable programmatically writing to files in a repository.

Awoke101
New Contributor II

@jacovangelder It throws the same error without cache_dir but will try with UC volumes.

Hmm interesting, then it's something else. 

The below code works for me on a Shared access mode cluster. (I don't know what your input dataset looks like):

df = spark.sql("SELECT '1' as content")

from fastembed import TextEmbedding, SparseTextEmbedding
from pyspark.sql.pandas.functions import pandas_udf, PandasUDFType
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType, IntegerType
import pandas as pd
from pyspark.sql.functions import col

@pandas_udf(ArrayType(FloatType()))
def generate_dense_embeddings(contents: pd.Series) ->  pd.Series:
    small_embedding_model = TextEmbedding(model_name="BAAI/bge-small-en-v1.5", cache_dir="/tmp/local_cache/")
    dense_embeddings_list = small_embedding_model.embed(contents)
    return pd.Series(list(dense_embeddings_list))

df=df.limit(50)
df.show(10)
embeddings = df.withColumn("dense_vector", generate_dense_embeddings(col("content")))
embeddings.show(10)

Are you sure your cluster setup is sufficient enough for what you're trying to achieve? 

@jacovangelder I think the resources are sufficient since it works on the personal cluster which has lesser resources. I tried to run the code you sent on my shared access mode cluster and it still didn't work. Maybe I need to make some changes to the cluster config? This is my current shared cluster config.

EDIT: Can you also share the version of the python packages? I had to downgrade my numpy version for DBR to work so that may also be the cause of this issue. Using fastembed v0.3.1 doesn't require a numpy downgrade but it still doesn't work with 13.3LTS. I am getting warnings due to version incompatibilities in pip.

jacovangelder
Contributor III

For some reason a moderator is removing my pip freeze? no idea why. Maybe too long/spammy for a comment.
Anyway, I am using DBR 14.3 LTS with Shared Access Mode. I haven't installed any other version apart from fastembed==0.3.1. Included a screenshot of my cluster config too. 

@jacovangelder thanks but the error was solved by adding this in my UDF.

user = os.environ.get("USER")

 

Glad its resolved 🙂

Join 100K+ Data Experts: Register Now & Grow with Us!

Excited to expand your horizons with us? Click here to Register and begin your journey to success!

Already a member? Login and join your local regional user group! If there isn’t one near you, fill out this form and we’ll create one for you to join!