Extracting Topics From Text Data Using PySpark

amirA
New Contributor II

Hi Everyone
I tried to follow the same steps in Topic from Text on similar data as example. However, when I tri to fit the model with data I get this error.
IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
My data:
Col => { "vec_json", "features" }

Row(vec_json='[{"type":0,"size":4927,"indices":[0,8,18,30,145,336,786,1231,1695,3653],"values":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]', features=Row(type=0, size=4927, indices=[0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], values=[2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))

lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
Many Thanks in advance
Regards
Amir

filipniziol
Esteemed Contributor

Hi @amirA ,

The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.

Check this out:

 

# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA

# Define schema for the input DataFrame
schema = StructType([
    StructField("vec_json", StringType(), True),
    StructField("features", StructType([
        StructField("type", IntegerType(), True),
        StructField("size", IntegerType(), True),
        StructField("indices", ArrayType(IntegerType()), True),
        StructField("values", ArrayType(DoubleType()), True)
    ]), True)
])

# Sample data
data = [
    ("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
     (0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
    return Vectors.sparse(features['size'], features['indices'], features['values'])

# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())

# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))

# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()

# Show the DataFrame to see the changes
df_new.show(truncate=False)

# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)

# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)

 

View solution in original post

amirA
New Contributor II

Thanks @filipniziol  for quick response. Legend. 
That's right It needed to be converted to sparse vector. 

Thank you so much for the solution.

I also would like to help you by sharing the domypaper.com site with you. If you are not good at writing an essay then you can take help from that site. I have also used it several times and it is worth it.