# Add Stop words
add_stopwords = ["http","https","amp","rt","t","c","the","@","/",":"] # standard web stop words
stopwordsRemover = StopWordsRemover(inputCol="words", outputCol="filtered").setStopWords(add_stopwords)
Count words
# Bag of Words Count
countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=10000, minDF=5)
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[regexTokenizer, stopwordsRemover, countVectors, label_stringIdx])
# Fit the pipeline to training documents.
pipelineFit = pipeline.fit(data)
dataset = pipelineFit.transform(data)
Once the transformation pipeline has been fit, you can use normal classification algorithms for classifying the text.
Extras
Get label numbers for each class
from pyspark.sql import *
from pyspark.sql.functions import col
labelset = dataset.select(col("class"),
col("label")).distinct()
display(labelset)
Split text body into sentences
from pyspark.sql.types import *
from pyspark.sql.window import *
from pyspark.sql.functions import col, split, explode, row_number
# Split text by sentence and convert to array
array_df = data.withColumn("text", split(col("text"), "\.").cast("array<string>"))
# Explode array into separate rows in the dataset
split_df = array_df.withColumn("text", explode(col("text")))\
.withColumn("part_number", row_number().over(Window.partitionBy("internet_message_id").orderBy("id")))
data = split_df
display(data)
Create `part_number` for the split sentences
from pyspark.sql.window import *
from pyspark.sql.functions import row_number
data.withColumn("part_number", row_number().over(Window.partitionBy("body_id").orderBy("id"))).show()