โ10-08-2024 06:56 PM
Hi Everyone
I tried to follow the same steps in Topic from Text on similar data as example. However, when I tri to fit the model with data I get this error.
IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
My data:
Col => { "vec_json", "features" }
Row(vec_json='[{"type":0,"size":4927,"indices":[0,8,18,30,145,336,786,1231,1695,3653],"values":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]', features=Row(type=0, size=4927, indices=[0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], values=[2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
โ10-08-2024 11:32 PM - edited โ10-08-2024 11:33 PM
Hi @amirA ,
The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.
Check this out:
# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA
# Define schema for the input DataFrame
schema = StructType([
    StructField("vec_json", StringType(), True),
    StructField("features", StructType([
        StructField("type", IntegerType(), True),
        StructField("size", IntegerType(), True),
        StructField("indices", ArrayType(IntegerType()), True),
        StructField("values", ArrayType(DoubleType()), True)
    ]), True)
])
# Sample data
data = [
    ("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
     (0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
    return Vectors.sparse(features['size'], features['indices'], features['values'])
# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())
# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))
# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()
# Show the DataFrame to see the changes
df_new.show(truncate=False)
# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)
โ10-08-2024 11:32 PM - edited โ10-08-2024 11:33 PM
Hi @amirA ,
The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.
Check this out:
# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA
# Define schema for the input DataFrame
schema = StructType([
    StructField("vec_json", StringType(), True),
    StructField("features", StructType([
        StructField("type", IntegerType(), True),
        StructField("size", IntegerType(), True),
        StructField("indices", ArrayType(IntegerType()), True),
        StructField("values", ArrayType(DoubleType()), True)
    ]), True)
])
# Sample data
data = [
    ("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
     (0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
    return Vectors.sparse(features['size'], features['indices'], features['values'])
# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())
# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))
# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()
# Show the DataFrame to see the changes
df_new.show(truncate=False)
# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)
โ10-09-2024 05:44 PM
Thanks @filipniziol  for quick response. Legend. 
That's right It needed to be converted to sparse vector. 
โ11-13-2024 11:47 PM
Thank you so much for the solution.
 
					
				
				
			
		
 
					
				
				
			
		
Passionate about hosting events and connecting people? Help us grow a vibrant local communityโsign up today to get started!
Sign Up Now