利用Spark计算TF-IDF

TF-IDF定义

TF-IDF(Term Frequency-Inverse Document Frequency,逆文档词频)是一项广为人知的文本挖掘算法,这一算法为文档中的每一项词赋予一个权重weight ,在一篇文档当中如果一个词语出现的频率越高说明这个词语在这篇文档当中的重要性越高,但是如果该词语普遍出现在众多的文档的当中,说明该词语是一个常用词,对于文档的特点并不具有代表性,那么这个词语的重要性又应该降低。
因此我们用TF,Term Frequency来计算词语在文档当中出现的词频,其计算方式如下:

$$
TF(x) = \frac{某个词x在文章中出现的次数} {文档总词数}
$$

而IDF(Inverse Document Frequency)表示逆向文件频率,来计算词语出现在了多少个文档当中,其计算方式如下:
$$
IDF(x) = log\frac{训练语料的总文档数}{出现词语x的文档数+1}
$$
这里的+1是一个平滑操作,防止出现除零操作。
最终TF-IDF值就是TF与IDF的权衡结果:$TF-IDF(x) = TF(x) * IDF(x)$。

下面直接给出单机、Python版的TF-IDF程序和分布式的、PySpark版的TF-IDF程序:

Python版TF-IDF程序view raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
#! python3
# -*- coding: utf-8 -*-

import nltk
import ssl
import math
import string

from nltk.corpus import stopwords
from collections import Counter
from nltk.stem.porter import*

try:
_create_unverified_https_context = ssl._create_unverified_context
except AttributeError:
pass
else:
ssl._create_default_https_context = _create_unverified_https_context

nltk.download()

nltk.download('punkt')
nltk.download('stopwords')

text1 = "I heard about Spark and I love Spark"

text2 = "I wish Java could use case classes'"

text3 = "Logistic regression models are neat"


def get_tokens(text):
lower = text.lower()
remove_punctuation_map = dict((ord(char), None) for char in string.punctuation)
no_punctuation = lower.translate(remove_punctuation_map)
tokens = nltk.word_tokenize(no_punctuation)

return tokens


def stem_tokens(tokens, stemmer):
stemmed = []
for item in tokens:
stemmed.append(stemmer.stem(item))

return stemmed


def tf(word, count):
tfv = count[word] / sum(count.values())
return tfv


def n_containing(word, count_list):
df = sum(1 for count in count_list if word in count)
return df


def idf(word, count_list):
idfv = math.log(len(count_list) / (1 + n_containing(word, count_list)))
return idfv


def tfidf(word, count, count_list):
return tf(word, count) * idf(word, count_list)


def count_term(text):
tokens = get_tokens(text)
filtered = [w for w in tokens if not w in stopwords.words('english')] # remove stopwords
stemmer = PorterStemmer()
stemmed = stem_tokens(filtered, stemmer)
count = Counter(stemmed)
return count


def main():
texts = [text1, text2, text3]
countlist = []
for text in texts:
countlist.append(count_term(text))
for i, count in enumerate(countlist):
print("Top words in document {}".format(i + 1))
scores = {word: tfidf(word, count, countlist) for word in count}
sorted_words = sorted(scores.items(), key=lambda x: x[1], reverse=True)
for word, score in sorted_words[:5]:
print("\tWord: {}, TF-IDF: {}".format(word, round(score, 5)))


if __name__ == "__main__":
main()
PySpark版TF-IDF程序view raw
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
#! python3
# -*- coding: utf-8 -*-
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("PySpark example").enableHiveSupport().getOrCreate()
spark.sparkContext.setLogLevel("WARN")
# Read data
df = spark.sql("SELECT img_label FROM sprs_log_basis.model_server_log WHERE datepart=20190425 LIMIT 10")
df.cache()
df.show()
# Get keys
df = df.select(F.map_keys("img_label").alias("keys"))
# Assign index
df = df.withColumn("doc_id", F.monotonically_increasing_id())
NUM_doc = df.count()
# One hot words
df = df.select('*', F.explode('keys').alias('token'))
df.show()
# Calculate TF
TF = df.groupBy("doc_id").agg(F.count("token").alias("doc_len")) \
.join(df.groupBy("doc_id", "token")
.agg(F.count("keys").alias("word_count")), ['doc_id']) \
.withColumn("tf", F.col("word_count") / F.col("doc_len")) \
.drop("doc_len", "word_count")
TF.cache()
# Calculate IDF
IDF = df.groupBy("token").agg(F.countDistinct("doc_id").alias("df"))
IDF = IDF.select('*', (F.log(NUM_doc / (IDF['df'] + 1))).alias('idf'))
IDF.cache()
# Calculate TF-IDF
TFIDF = TF.join(IDF, ['token']).withColumn('tf-idf', F.col('tf') * F.col('idf'))
TFIDF.show()
TFIDF.write.save("s3://***.tmp.ap-southeast-1/Default/hailin/here.csv", format='csv', header=True)

References:
http://www.tfidf.com/
https://dzone.com/articles/calculating-tf-idf-with-apache-spark
https://towardsdatascience.com/sentiment-analysis-with-pyspark-bc8e83f80c35
http://hejunhao.me/archives/856