1 # coding:utf8
2 from pyspark import SparkContext
3 from pyspark import RDD
4 import numpy as np
5 from numpy.random import RandomState
6
7 import sys
8 reload(sys)
9 #设置默认编码为utf8,从spark rdd中取出中文词汇时需要编码为中文编码,否则不能保存成功
10 sys.setdefaultencoding('utf8')
11
12
13 """
14 总结:
15 broadcast变量和需要用到broadcast变量的方法需要位于同一作用域
16
17 broadcast变量的unpersist会将存储broadcast变量的文件立即删除,
18 而此时rdd并未被触发执行,当rdd执行时会发现没有broadcast变量,所以会报错,
19 建议只在程序运行完成后,将broadcast变量 unpersist
20 """
21
22 class PLSA:
23
24 def __init__(self, data, sc, k, is_test=False, max_itr=1000, eta=1e-6):
25
26 """
27 init the algorithm
28
29 :type data RDD
30 :param data: 输入文章rdd,每条记录为一系列用空格分隔的词,如"我 爱 蓝天 我 爱 白云"
31 :type max_itr int
32 :param max_itr: 最大EM迭代次数
33 :type is_test bool
34 :param is_test: 是否为测试,是则rd = RandomState(1),否则 rd = RandomState()
35 :type sc SparkContext
36 :param sc: spark context
37 :type k int
38 :param k : 主题个数
39 :type eta float
40 :param : 阈值,当log likelyhood的变化小于eta时,停止迭代
41 :return : PLSA object
42 """
43 self.max_itr = max_itr
44 self.k = sc.broadcast(k)
45 self.ori_data = data.map(lambda x: x.split(' '))
46 self.sc = sc
47 self.eta = eta
48
49 self.rd = sc.broadcast(RandomState(1) if is_test else RandomState())
50
51 def train(self):
52
53 #获取词汇字典 ,如{"我":1}
54 self.word_dict_b = self._init_dict_()
55 #将文本中词汇,转成词典中的index
56 self._convert_docs_to_word_index()
57 #初始化,每个主题下的单词分布
58 self._init_probility_word_topic_()
59
60 pre_l= self._log_likelyhood_()
61
62 print "L(%d)=%.5f" %(0,pre_l)
63
64 for i in range(self.max_itr):
65 #更新每个单词主题的后验分布
66 self._E_step_()
67 #最大化下界
68 self._M_step_()
69 now_l = self._log_likelyhood_()
70
71 improve = np.abs((pre_l-now_l)/pre_l)
72 pre_l = now_l
73
74 print "L(%d)=%.5f with %.6f%% improvement" %(i+1,now_l,improve*100)
75 if improve <self.eta:
76 break
77
78 def _M_step_(self):
79 """
80 更新参数 p(z=k|d),p(w|z=k)
81 :return: None
82 """
83 k = self.k
84 v = self.v
85
86 def update_probility_of_doc_topic(doc):
87 """
88 更新文章的主题分布
89 """
90 doc['topic'] = doc['topic'] - doc['topic']
91
92 topic_doc = doc['topic']
93 words = doc['words']
94 for (word_index,word) in words.items():
95 topic_doc += word['count']*word['topic_word']
96 topic_doc /= np.sum(topic_doc)
97
98 return {'words':words,'topic':topic_doc}
99
100 self.data = self.data.map(update_probility_of_doc_topic)
101 """
102 rdd相当于一系列操作过程的结合,且前面的操作过程嵌套在后面的操作过程里,当这个嵌套超过大约60,spark会报错,
103 这里每次M step都通过cache将前面的操作执行掉
104 """
105 self.data.cache()
106
107 def update_probility_word_given_topic(doc):
108 """
109 更新每个主题下的单词分布
110 """
111 probility_word_given_topic = np.matrix(np.zeros((k.value,v.value)))
112
113 words = doc['words']
114 for (word_index,word) in words.items():
115 probility_word_given_topic[:,word_index] += np.matrix(word['count']*word['topic_word']).T
116
117 return probility_word_given_topic
118
119 probility_word_given_topic = self.data.map(update_probility_word_given_topic).sum()
120 probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1))
121
122 #使每个主题下单词概率和为1
123 probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)
124
125 self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)
126
127 def _E_step_(self):
128 """
129 更新隐变量 p(z|w,d)-给定文章,和单词后,该单词的主题分布
130 :return: None
131 """
132 probility_word_given_topic = self.probility_word_given_topic
133 k = self.k
134
135 def update_probility_of_word_topic_given_word(doc):
136 topic_doc = doc['topic']
137 words = doc['words']
138
139 for (word_index,word) in words.items():
140 topic_word = word['topic_word']
141 for i in range(k.value):
142 topic_word[i] = probility_word_given_topic.value[i,word_index]*topic_doc[i]
143 #使该单词各主题分布概率和为1
144 topic_word /= np.sum(topic_word)
145 return {'words':words,'topic':topic_doc}
146
147 self.data = self.data.map(update_probility_of_word_topic_given_word)
148
149 def _init_probility_word_topic_(self):
150 """
151 init p(w|z=k)
152 :return: None
153 """
154 #dict length(words in dict)
155 m = self.v.value
156
157 probility_word_given_topic = self.rd.value.uniform(0,1,(self.k.value,m))
158 probility_word_given_topic_row_sum = np.matrix(np.sum(probility_word_given_topic,axis=1)).T
159
160 #使每个主题下单词概率和为1
161 probility_word_given_topic = np.divide(probility_word_given_topic,probility_word_given_topic_row_sum)
162
163 self.probility_word_given_topic = self.sc.broadcast(probility_word_given_topic)
164
165 def _convert_docs_to_word_index(self):
166
167 word_dict_b = self.word_dict_b
168 k = self.k
169 rd = self.rd
170 '''
171 I wonder is there a better way to execute function with broadcast varible
172 '''
173 def _word_count_doc_(doc):
174 wordcount ={}
175 word_dict = word_dict_b.value
176 for word in doc:
177 if wordcount.has_key(word_dict[word]):
178 wordcount[word_dict[word]]['count'] += 1
179 else:
180 #first one is the number of word occurance
181 #second one is p(z=k|w,d)
182 wordcount[word_dict[word]] = {'count':1,'topic_word': rd.value.uniform(0,1,k.value)}
183
184 topics = rd.value.uniform(0, 1, k.value)
185 topics = topics/np.sum(topics)
186 return {'words':wordcount,'topic':topics}
187
188 self.data = self.ori_data.map(_word_count_doc_)
189
190 def _init_dict_(self):
191 """
192 init word dict of the documents,
193 and broadcast it
194 :return: None
195 """
196 words = self.ori_data.flatMap(lambda d: d).distinct().collect()
197 word_dict = {w: i for w, i in zip(words, range(len(words)))}
198 self.v = self.sc.broadcast(len(word_dict))
199 return self.sc.broadcast(word_dict)
200
201 def _log_likelyhood_(self):
202 probility_word_given_topic = self.probility_word_given_topic
203 k = self.k
204
205 def likelyhood(doc):
206 l = 0.0
207 topic_doc = doc['topic']
208 words = doc['words']
209
210 for (word_index,word) in words.items():
211 l += word['count']*np.log(np.matrix(topic_doc)*probility_word_given_topic.value[:,word_index])
212 return l
213 return self.data.map(likelyhood).sum()
214
215 def save(self,f_word_given_topic,f_doc_topic):
216 """
217 保存模型结果 TODO 添加分布式保存结果
218 :param f_word_given_topic: 文件路径,用于给定主题下词汇分布
219 :param f_doc_topic: 文件路径,用于保存文档的主题分布
220 :return:
221 """
222 doc_topic = self.data.map(lambda x:' '.join([str(q) for q in x['topic'].tolist()])).collect()
223 probility_word_given_topic = self.probility_word_given_topic.value
224
225 word_dict = self.word_dict_b.value
226 word_given_topic = []
227
228 for w,i in word_dict.items():
229 word_given_topic.append('%s %s' %(w,' '.join([str(q[0]) for q in probility_word_given_topic[:,i].tolist()])))
230
231 f1 = open (f_word_given_topic, 'w')
232
233 for line in word_given_topic:
234 f1.write(line)
235 f1.write('
')
236 f1.close()
237
238 f2 = open (f_doc_topic, 'w')
239
240 for line in doc_topic:
241 f2.write(line)
242 f2.write('
')
243 f2.close()
1 from PLSA import PLSA
2 from pyspark import SparkContext
3
4 if __name__=="__main__":
5 sc = SparkContext('local')
6 data = sc.textFile("E:/github/FGYML4/data/news_seg/news_seg.txt")
7 plsa = PLSA(data,sc,3,max_itr=1)
8 plsa.train()
9 plsa.save('D:/topic_word','D:/doc_topic')