自制基于HMM的python中文分词器
不像英文那樣單詞之間有空格作為天然的分界線, 中文詞語之間沒有明顯界限。必須采用一些方法將中文語句劃分為單詞序列才能進一步處理, 這一劃分步驟即是所謂的中文分詞。
主流中文分詞方法包括基于規則的分詞,基于大規模語料庫的統計學習方法以及在實踐中應用最多的規則與統計綜合方法。
隱馬爾科夫模型(HMM)是中文分詞中一類常用的統計模型, 本文將使用該模型構造分詞器。關于HMM模型的介紹可以參見隱式馬爾科夫模型.
方法介紹
中文分詞問題可以表示為一個序列標注問題,定義兩個類別:
E代表詞語中最后一個字
B代表詞的首個字
M代表詞中間的字
S代表單字成詞
對于分詞結果:"我/只是/做了/一些/微小/的/工作",可以標注為"我E只B是E做B了E一B些E微B小E的S工B作E".
將標記序列"EBEBEBEBESBE"作為狀態序列, 原始文本"我只是做了一些微小的工作"為觀測序列. 分詞過程即變成了求使給定觀測序列出現概率最大的狀態序列, 即解碼問題。
這里需要說明一下,所謂出現概率最大是指在自然語言中出現概率最大。
根據語料庫是否標注, HMM可以進行監督學習和無監督學習。若語料庫未標注則需要使用EM算法進行無監督學習, 若語料庫已經標注那么可以使用頻率估計概率即監督學習方法。
程序實現
本文實現已在python3.5, OS X及Ubuntu操作系統上測試通過.
定義數據結構
上文中已定義用于標注序列的標簽, 也即HMM中的狀態:
STATES = {'B', 'M', 'E', 'S'}除了用于分詞外, HMM模型還可以用于處理很多問題. 因此, 我們將HMM模型封裝為獨立的類便于復用:
class HMModel:def __init__(self):self.trans_mat = {} # trans_mat[status][status] = intself.emit_mat = {} # emit_mat[status][observe] = intself.init_vec = {} # init_vec[status] = intself.state_count = {} # state_count[status] = intself.states = {}self.inited = False其中:
trans_mat: 狀態轉移矩陣, trans_mat[state1][state2]表示訓練集中由state1轉移到state2的次數。
emit_mat: 觀測矩陣, emit_mat[state][char]表示訓練集中單字char被標注為state的次數
init_vec: 初始狀態分布向量, init_vec[state]表示狀態state在訓練集中出現的次數
state_count: 狀態統計向量,state_count[state]
word_set: 詞集合, 包含所有單詞
初始化上述數據結構:
def setup(self):for state in self.states:# build trans_matself.trans_mat[state] = {}for target in self.states:self.trans_mat[state][target] = 0.0# build emit_matself.emit_mat[state] = {}# build init_vecself.init_vec[state] = 0# build state_countself.state_count[state] = 0self.inited = True加載數據:
def load_data(self, filename):self.data = file(data_path(filename))self.setup()訓練數據集用空格分割單詞:
如果 出現 這種 情況 , 則 增加 的 居民 儲蓄 存款 的 全部 或 一 部 只能 抵 作 積壓 產品 占用 的 資金 而 不能 補充 流動資金 正常 需要量 的 增加訓練模型
因為使用標注數據集, 可以使用更簡單的監督學習算法:
def do_train(self, observes, states):if not self.inited:self.setup()for i in range(len(states)):if i == 0:self.init_vec[states[0]] += 1self.state_count[states[0]] += 1else:self.trans_mat[states[i - 1]][states[i]] += 1self.state_count[states[i]] += 1if observes[i] not in self.emit_mat[states[i]]:self.emit_mat[states[i]][observes[i]] = 1else:self.emit_mat[states[i]][observes[i]] += 1訓練函數輸入觀測序列和狀態序列進行訓練, 依次更新各矩陣數據.
類中維護的模型參數均為頻數而非頻率, 這樣的設計使得模型可以進行在線訓練。
所謂的在線訓練是指: 模型隨時都可以接受新的訓練數據繼續訓練,不會丟失前次訓練的結果。
預測算法
在進行預測前需要定義get_prob方法將頻數轉換為頻率:
def get_prob(self):init_vec = {}trans_mat = {}emit_mat = {}# convert init_vec to probfor key in self.init_vec:init_vec[key] = float(self.init_vec[key]) / self.state_count[key]# convert trans_mat to probfor key1 in self.trans_mat:trans_mat[key1] = {}for key2 in self.trans_mat[key1]:trans_mat[key1][key2] = float(self.trans_mat[key1][key2]) / self.state_count[key1]# convert emit_mat to probfor key1 in self.emit_mat:emit_mat[key1] = {}for key2 in self.emit_mat[key1]:emit_mat[key1][key2] = float(self.emit_mat[key1][key2]) / self.state_count[key1]return init_vec, trans_mat, emit_mat預測采用Viterbi算法求得最優路徑:
def do_predict(self, sequence):tab = [{}]path = {}init_vec, trans_mat, emit_mat = self.get_prob()# initfor state in self.states:tab[0][state] = init_vec[state] * emit_mat[state].get(sequence[0], EPS)path[state] = [state]# build dynamic search tablefor t in range(1, len(sequence)):tab.append({})new_path = {}for state1 in self.states:items = []for state2 in self.states:if tab[t - 1][state2] == 0:continueprob = tab[t - 1][state2] * trans_mat[state2].get(state1, EPS) * emit_mat[state1].get(sequence[t], EPS)items.append((prob, state2))best = max(items) # best: (prob, state)tab[t][state1] = best[0]new_path[state1] = path[best[1]] + [state1]path = new_path# search best pathprob, state = max([(tab[len(sequence) - 1][state], state) for state in self.states])return path[state]tab是動態規劃表, tab[t][state]表示 時刻t 到達state狀態 的所有路徑中,概率最大路徑的 概率值。
初值tab[0][state]為emit_mat[state][char0] * init_vec[state], 其中char0代表輸入序列的第一個單字, 若emit_mat中不存在char0的記錄則默認為0.
若輸入序列的長度為T, 則執行T-1次迭代, 每次迭代在tab中添加一行. 計算t-1時刻各狀態轉移到state狀態的概率, 選擇其中最大者作為tab[t][state]的值, 并將選中的狀態轉移寫入path中。
在tab的最后一行中尋找概率最大的狀態作為終態,從path中取出狀態轉移路徑作為標注序列。
保存載入
提供json和pickle兩種格式的存儲. 實現保存方法:
def save(self, filename="hmm.json", code='json'):fw = open(filename, 'w', encoding='utf-8')data = {"trans_mat": self.trans_mat,"emit_mat": self.emit_mat,"init_vec": self.init_vec,"state_count": self.state_count}if code == "json":txt = json.dumps(data)txt = txt.encode('utf-8').decode('unicode-escape')fw.write(txt)elif code == "pickle":pickle.dump(data, fw)load方法也很簡單:
def load(self, filename="hmm.json", code="json"):fr = open(filename, 'r', encoding='utf-8')if code == "json":txt = fr.read()model = json.loads(txt)elif code == "pickle":model = pickle.load(fr)self.trans_mat = model["trans_mat"]self.emit_mat = model["emit_mat"]self.init_vec = model["init_vec"]self.state_count = model["state_count"]self.inited = True實現分詞器
定義一個工具函數, 對每個字進行標注:
STATES = {'B', 'M', 'E', 'S'}def get_tags(src):tags = []if len(src) == 1:tags = ['S']elif len(src) == 2:tags = ['B', 'E']else:m_num = len(src) - 2tags.append('B')tags.extend(['M'] * m_num)tags.append('S')return tags最后定義一個工具函數, 根據標注序列將輸入的句子分割為詞語列表:
def cut_sent(src, tags):word_list = []start = -1started = Falseif len(tags) != len(src):return Noneif tags[-1] not in {'S', 'E'}:if tags[-2] in {'S', 'E'}:tags[-1] = 'S' # for tags: r".*(S|E)(B|M)"else:tags[-1] = 'E' # for tags: r".*(B|M)(B|M)"for i in range(len(tags)):if tags[i] == 'S':if started:started = Falseword_list.append(src[start:i]) # for tags: r"BM*S"word_list.append(src[i])elif tags[i] == 'B':if started:word_list.append(src[start:i]) # for tags: r"BM*B"start = istarted = Trueelif tags[i] == 'E':started = Falseword = src[start:i+1]word_list.append(word)elif tags[i] == 'M':continuereturn word_list需要說明的是, 因為概率模型無法保證每個B標記后都有對應的E標記, 因此認為B標記后的S和B標記都會像E標記一樣結束一個單詞。
繼承HMModel類, 實現HMMSegger分詞器:
class HMMSegger(HMModel):def __init__(self, *args, **kwargs):super(HMMSegger, self).__init__(*args, **kwargs)self.states = STATESself.data = Nonedef load_data(self, filename):self.data = open(filename, 'r', encoding="utf-8")def train(self):if not self.inited:self.setup()# trainfor line in self.data:# pre processingline = line.strip()if not line:continue# get observesobserves = []for i in range(len(line)):if line[i] == " ":continueobserves.append(line[i])# get stateswords = line.split(" ") # spilt word by whitespacestates = []for word in words:if word in seg_stop_words:continuestates.extend(get_tags(word))# resume trainself.do_train(observes, states)train()方法根據單詞生成觀測序列和狀態序列, 并送往do_train()方法進行訓練.
編寫cut()方法作為操作接口:
def cut(self, sentence):try:tags = self.do_predict(sentence)return cut_sent(sentence, tags)except:return sentence最后寫幾個測試用例看下效果:
>>> segger = HMMSegger() >>> segger.load_data("data.txt") >>> segger.train() >>> segger.cut("長春市長春節講話") ["長春","市長","春節","講話"] >>> segger.cut("生存還是死亡是一個問題") ['生存', '還', '是', '死亡', '是', '一個', '問題'] >>> segger.cut("我只是做了一些微小的工作") ["我","只是","做","了","一些","微小","的","工作"]分詞的結果基本令人滿意.
HMModel源碼
HMMSegger源碼
因依賴關系,若要運行代碼請clone整個項目, 并運行python3 test.py。
因訓練數據集過于龐大故未上傳,可自行制備訓練數據集。 項目中包含訓練過的模型data/hmm.json可以自由體驗。
轉載于:https://www.cnblogs.com/Finley/p/6358097.html
總結
以上是生活随笔為你收集整理的自制基于HMM的python中文分词器的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: windows安装composer方法和
- 下一篇: Python实现抓取CSDN博客首页文章