Hi @amirA ,
The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.
Check this out:
# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA
# Define schema for the input DataFrame
schema = StructType([
StructField("vec_json", StringType(), True),
StructField("features", StructType([
StructField("type", IntegerType(), True),
StructField("size", IntegerType(), True),
StructField("indices", ArrayType(IntegerType()), True),
StructField("values", ArrayType(DoubleType()), True)
]), True)
])
# Sample data
data = [
("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
(0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]
# Create DataFrame
df = spark.createDataFrame(data, schema)
# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
return Vectors.sparse(features['size'], features['indices'], features['values'])
# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())
# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))
# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()
# Show the DataFrame to see the changes
df_new.show(truncate=False)
# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)