โ10-08-2024 06:56 PM
Hi Everyone
I tried to follow the same steps in Topic from Text on similar data as example. However, when I tri to fit the model with data I get this error.
IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
My data:
Col => { "vec_json", "features" }
Row(vec_json='[{"type":0,"size":4927,"indices":[0,8,18,30,145,336,786,1231,1695,3653],"values":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]', features=Row(type=0, size=4927, indices=[0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], values=[2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
โ10-08-2024 11:32 PM - edited โ10-08-2024 11:33 PM
Hi @amirA ,
The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.
Check this out:
# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA
# Define schema for the input DataFrame
schema = StructType([
StructField("vec_json", StringType(), True),
StructField("features", StructType([
StructField("type", IntegerType(), True),
StructField("size", IntegerType(), True),
StructField("indices", ArrayType(IntegerType()), True),
StructField("values", ArrayType(DoubleType()), True)
]), True)
])
# Sample data
data = [
("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
(0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
return Vectors.sparse(features['size'], features['indices'], features['values'])
# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())
# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))
# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()
# Show the DataFrame to see the changes
df_new.show(truncate=False)
# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)
โ10-08-2024 11:32 PM - edited โ10-08-2024 11:33 PM
Hi @amirA ,
The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.
Check this out:
# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA
# Define schema for the input DataFrame
schema = StructType([
StructField("vec_json", StringType(), True),
StructField("features", StructType([
StructField("type", IntegerType(), True),
StructField("size", IntegerType(), True),
StructField("indices", ArrayType(IntegerType()), True),
StructField("values", ArrayType(DoubleType()), True)
]), True)
])
# Sample data
data = [
("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
(0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
return Vectors.sparse(features['size'], features['indices'], features['values'])
# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())
# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))
# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()
# Show the DataFrame to see the changes
df_new.show(truncate=False)
# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)
โ10-09-2024 05:44 PM
Thanks @filipniziol for quick response. Legend.
That's right It needed to be converted to sparse vector.
a month ago
Thank you so much for the solution.
Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโt want to miss the chance to attend and share knowledge.
If there isnโt a group near you, start one and help create a community that brings people together.
Request a New Group