cancel
Showing results forย 
Search instead forย 
Did you mean:ย 
Machine Learning
Dive into the world of machine learning on the Databricks platform. Explore discussions on algorithms, model training, deployment, and more. Connect with ML enthusiasts and experts.
cancel
Showing results forย 
Search instead forย 
Did you mean:ย 

Extracting Topics From Text Data Using PySpark

amirA
New Contributor II

Hi Everyone
I tried to follow the same steps in Topic from Text on similar data as example. However, when I tri to fit the model with data I get this error.
IllegalArgumentException: requirement failed: Column features must be of type equal to one of the following types: [struct<type:tinyint,size:int,indices:array<int>,values:array<double>>, array<double>, array<float>] but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.
My data:
Col => { "vec_json", "features" }

Row(vec_json='[{"type":0,"size":4927,"indices":[0,8,18,30,145,336,786,1231,1695,3653],"values":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]', features=Row(type=0, size=4927, indices=[0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], values=[2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))

lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)
Many Thanks in advance
Regards
Amir
1 ACCEPTED SOLUTION

Accepted Solutions

filipniziol
Contributor

Hi @amirA ,

The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.

Check this out:

 

# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA

# Define schema for the input DataFrame
schema = StructType([
    StructField("vec_json", StringType(), True),
    StructField("features", StructType([
        StructField("type", IntegerType(), True),
        StructField("size", IntegerType(), True),
        StructField("indices", ArrayType(IntegerType()), True),
        StructField("values", ArrayType(DoubleType()), True)
    ]), True)
])

# Sample data
data = [
    ("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
     (0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
    return Vectors.sparse(features['size'], features['indices'], features['values'])

# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())

# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))

# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()

# Show the DataFrame to see the changes
df_new.show(truncate=False)

# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)

# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)

 

View solution in original post

2 REPLIES 2

filipniziol
Contributor

Hi @amirA ,

The LDA model expects the features column to be of type Vector from the pyspark.ml.linalg module, specifically either a SparseVector or DenseVector, whereas you have provided Row type.
You need to convert your Row object to SparseVector.

Check this out:

 

# Import required libraries
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StructType, StructField, IntegerType, ArrayType, DoubleType, StringType
from pyspark.ml.linalg import SparseVector, Vectors, VectorUDT
from pyspark.ml.clustering import LDA

# Define schema for the input DataFrame
schema = StructType([
    StructField("vec_json", StringType(), True),
    StructField("features", StructType([
        StructField("type", IntegerType(), True),
        StructField("size", IntegerType(), True),
        StructField("indices", ArrayType(IntegerType()), True),
        StructField("values", ArrayType(DoubleType()), True)
    ]), True)
])

# Sample data
data = [
    ("[{\"type\":0,\"size\":4927,\"indices\":[0,8,18,30,145,336,786,1231,1695,3653],\"values\":[2.0,1.0,1.0,2.0,1.0,2.0,1.0,1.0,2.0,1.0]}]",
     (0, 4927, [0, 8, 18, 30, 145, 336, 786, 1231, 1695, 3653], [2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0]))
]

# Create DataFrame
df = spark.createDataFrame(data, schema)

# Define a UDF to convert struct to SparseVector
def convert_to_sparse_vector(features):
    return Vectors.sparse(features['size'], features['indices'], features['values'])

# Register the UDF with the correct return type
convert_to_sparse_vector_udf = udf(convert_to_sparse_vector, VectorUDT())

# Convert the features column to SparseVector type
df_new = df.withColumn("features", convert_to_sparse_vector_udf(col("features")))

# Verify the schema to confirm that "features" is now a SparseVector
df_new.printSchema()

# Show the DataFrame to see the changes
df_new.show(truncate=False)

# Fit the LDA model
lda_model = LDA(k=20, maxIter=20)
model = lda_model.fit(df_new)

# Print the topics
topics = model.describeTopics()
topics.show(truncate=False)

 

amirA
New Contributor II

Thanks @filipniziol  for quick response. Legend. 
That's right It needed to be converted to sparse vector. 

Connect with Databricks Users in Your Area

Join a Regional User Group to connect with local Databricks users. Events will be happening in your city, and you wonโ€™t want to miss the chance to attend and share knowledge.

If there isnโ€™t a group near you, start one and help create a community that brings people together.

Request a New Group