Spark Mlib TFIDF源码详读 笔记
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
在提取文本特征時(shí),經(jīng)常用到TF-IDF算法。Spark Mlib實(shí)現(xiàn)了該算法。下面是Spark Mlib中,TF_IDF算法調(diào)用的一個(gè)實(shí)例:
def?main(args:Array[String]){val?sc:?SparkContext?=?null?????????????????????????//?Load?documents?(one?per?line).val?documents:?RDD[Seq[String]]?=?sc.textFile("...").map(_.split("?").toSeq)val?hashingTF?=?new?HashingTF()//計(jì)算tf?val?tf:?RDD[Vector]?=?hashingTF.transform(documents)tf.cache()//得到idfModel對(duì)象?val?idf?=?new?IDF().fit(tf)//得到tf-idf值val?tfidf:?RDD[Vector]?=?idf.transform(tf)要求輸入數(shù)據(jù) ?必須是一行一篇文章(切過詞的),Spark Mlib中沒有提供切詞的工具,但給出了建議使用的切詞工具?Stanford NLP Group?and?scalanlp/chalk
1、TF源碼詳讀
在調(diào)用的代碼中,我們找到
val?hashingTF?=?new?HashingTF() //計(jì)算tf? val?tf:?RDD[Vector]?=?hashingTF.transform(documents)??獲取TF,主要是通過HashingTF類的?transform方法,跟蹤該方法
?
??/***?Transforms?the?input?document?to?term?frequency?vectors.*/@Since("1.1.0")def?transform[D?<:?Iterable[_]](dataset:?RDD[D]):?RDD[Vector]?=?{dataset.map(this.transform)}?
SparkMlib是基于RDD的,所以在看源碼前,必須要對(duì)RDD熟悉。再看?dataset.map(this.transform)中的transform方法:
?
?/***?Transforms?the?input?document?into?a?sparse?term?frequency?vector.*/@Since("1.1.0")def?transform(document:?Iterable[_]):?Vector?=?{//定義詞頻的mapval?termFrequencies?=?mutable.HashMap.empty[Int,?Double]//循環(huán)每篇文章里的每個(gè)詞document.foreach?{?term?=>//獲取詞項(xiàng)term對(duì)應(yīng)的向量位置val?i?=?indexOf(term)//i即代表這個(gè)詞,統(tǒng)計(jì)次數(shù)放入termFrequenciestermFrequencies.put(i,?termFrequencies.getOrElse(i,?0.0)?+?1.0)}//將詞特征映射到一個(gè)很大維度的向量中去?稀疏向量?numFeatures是類HashingTF的成員變量?可以在調(diào)用HashingTF傳入,如果沒有傳入,默認(rèn)為2的20次方Vectors.sparse(numFeatures,?termFrequencies.toSeq)}?
transform方法對(duì)每一行(即每篇文章)都會(huì)執(zhí)行一次,主要是計(jì)算每篇文章里的詞的詞頻,轉(zhuǎn)存入一個(gè)維度很大的稀疏向量中,每個(gè)詞在該向量中對(duì)應(yīng)的位置就是:
?@Since("1.1.0")def?indexOf(term:?Any):?Int?=?Utils.nonNegativeMod(term.##,?numFeatures)term.##相當(dāng)于hashcode(),得到每個(gè)詞的hash值,然后對(duì)numFeatures 取模,是個(gè)Int型的值
到此為止,TF就計(jì)算完了,最終的結(jié)果是一個(gè)存放詞的位置,以及該詞對(duì)應(yīng)詞頻的 向量,即SparseVector(size, indices, values)
2、IDF源碼詳讀? ? ?
??????//得到idfModel對(duì)象?輸入的tf類型是SparseVector(size,?indices,?values)val?idf?=?new?IDF().fit(tf)//得到tf-idf值val?tfidf:?RDD[Vector]?=?idf.transform(tf)IDF實(shí)現(xiàn)主要通過兩步:
第一步:?val?idf?=?new?IDF().fit(tf)
?/***?Computes?the?inverse?document?frequency.*?@param?dataset?an?RDD?of?term?frequency?vectors*/@Since("1.1.0")def?fit(dataset:?RDD[Vector]):?IDFModel?=?{//返回?IDF向量?類型是DenseVector(values)val?idf?=?dataset.treeAggregate(new?IDF.DocumentFrequencyAggregator(minDocFreq?=?minDocFreq))(///minDocFreq是詞最小出現(xiàn)頻率,不填是默認(rèn)0seqOp?=?(df,v)?=>?df.add(v),//計(jì)算combOp?=?(df1,?df2)?=>?df1.merge(df2)//合并).idf()new?IDFModel(idf)}上面treeAggregate方法原型是def treeAggregate[U: ClassTag](zeroValue: U)( seqOp: (U, T) => U, combOp: (U, U) =>U, depth: Int = 2): U ? ?
treeAggregate是使用mapPartition進(jìn)行計(jì)算的,需定義兩個(gè)操作符,一個(gè)用來計(jì)算,一個(gè)用來合并結(jié)果
?seqOp 用來計(jì)算分區(qū)結(jié)果的操作符 (an operator used to accumulate results within a partition)
combOp 用來組合來自不同分區(qū)結(jié)果的關(guān)聯(lián)操作符( an associative operator used to combine results from different partitions)
該方法的調(diào)用返回new IDF.DocumentFrequencyAggregator對(duì)象,接著又調(diào)用DocumentFrequencyAggregator的idf方法,返回idf向量,然后又通過new IDFModel(idf)返回IDFModel對(duì)象
下面是?DocumentFrequencyAggregator?類的方法,即一個(gè)add(seqOp)一個(gè)merge(combOp)
?
private?object?IDF?{/**?Document?frequency?aggregator.?*/class?DocumentFrequencyAggregator(val?minDocFreq:?Int)?extends?Serializable?{/**?number?of?documents?文檔總數(shù)量*/?private?var?m?=?0L/**?document?frequency?vector?df向量,詞在出現(xiàn)過的文檔個(gè)數(shù)*/private?var?df:?BDV[Long]?=?_def?this()?=?this(0)?//構(gòu)造方法,如果minDocFreq沒有傳入的話,默認(rèn)值為0/**?Adds?a?new?document.?這個(gè)地方就是執(zhí)行的每個(gè)分區(qū)里的計(jì)算操作?,輸入是tf向量*/def?add(doc:?Vector):?this.type?=?{if?(isEmpty)?{df?=?BDV.zeros(doc.size)}doc?match?{//tf向量是?SparseVector?所以會(huì)走這個(gè)casecase?SparseVector(size,?indices,?values)?=>val?nnz?=?indices.sizevar?k?=?0while?(k?<?nnz)?{if?(values(k)?>?0)?{df(indices(k))?+=?1L?//如果詞在文章中出的頻率大于0,則該詞的df+1}k?+=?1}case?DenseVector(values)?=>val?n?=?values.sizevar?j?=?0while?(j?<?n)?{if?(values(j)?>?0.0)?{df(j)?+=?1L}j?+=?1}case?other?=>throw?new?UnsupportedOperationException(s"Only?sparse?and?dense?vectors?are?supported?but?got?${other.getClass}.")}m?+=?1Lthis}/**?Merges?another.?這個(gè)地方就是執(zhí)行所有分區(qū)的合并操作*/def?merge(other:?DocumentFrequencyAggregator):?this.type?=?{if?(!other.isEmpty)?{m?+=?other.m?//總文檔數(shù)合并if?(df?==?null)?{df?=?other.df.copy}?else?{df?+=?other.df?//df向量合并}}this}private?def?isEmpty:?Boolean?=?m?==?0L/**?Returns?the?current?IDF?vector.?計(jì)算idf向量的方法?*/def?idf():?Vector?=?{if?(isEmpty)?{throw?new?IllegalStateException("Haven't?seen?any?document?yet.")}val?n?=?df.lengthval?inv?=?new?Array[Double](n)var?j?=?0while?(j?<?n)?{/**?If?the?term?is?not?present?in?the?minimum*?number?of?documents,?set?IDF?to?0.?This*?will?cause?multiplication?in?IDFModel?to*?set?TF-IDF?to?0.**?Since?arrays?are?initialized?to?0?by?default,*?we?just?omit?changing?those?entries.*/if?(df(j)?>=?minDocFreq)?{?//如果df大于設(shè)定的值,就計(jì)算idf的值,如果不大于的話,就直接設(shè)置為0inv(j)?=?math.log((m?+?1.0)?/?(df(j)?+?1.0))}j?+=?1}Vectors.dense(inv)?//返回idf?密集向量}} }第二步:通過上面的計(jì)算得到idf向量,剩下的工作就是計(jì)算 tf*idf了,會(huì)用到IDFMode類中的transform方法?val tfidf: RDD[Vector] = idf.transform(tf)
private?object?IDFModel?{/***?Transforms?a?term?frequency?(TF)?vector?to?a?TF-IDF?vector?with?a?IDF?vector**?@param?idf?an?IDF?vector*?@param?v?a?term?frequence?vector*?@return?a?TF-IDF?vector*/def?transform(idf:?Vector,?v:?Vector):?Vector?=?{val?n?=?v.sizev?match?{//會(huì)進(jìn)入這個(gè)casecase?SparseVector(size,?indices,?values)?=>val?nnz?=?indices.sizeval?newValues?=?new?Array[Double](nnz)var?k?=?0while?(k?<?nnz)?{newValues(k)?=?values(k)?*?idf(indices(k))?//計(jì)算tf*idfk?+=?1}Vectors.sparse(n,?indices,?newValues)?//TFIDF向量case?DenseVector(values)?=>val?newValues?=?new?Array[Double](n)var?j?=?0while?(j?<?n)?{newValues(j)?=?values(j)?*?idf(j)j?+=?1}Vectors.dense(newValues)case?other?=>throw?new?UnsupportedOperationException(s"Only?sparse?and?dense?vectors?are?supported?but?got?${other.getClass}.")}} }以上就是整個(gè)TFIDF的計(jì)算過程,用到Spark Mlib 的密集向量(DenseVector)和稀疏向量(SparseVector)?、RDD的聚合操作
主要相關(guān)的類有三個(gè):HashingTF 、IDF、IDFModel?
還有就是利用spark Mlib 的TFIDF生成的TFIDF向量,位置信息存是詞hash后和向量維度取模后的值,而不是該詞,在后面做一些分類,或者文本推薦的時(shí)候,如果需要用到詞本身,還需要做調(diào)整
?
轉(zhuǎn)載于:https://my.oschina.net/xiaoluobutou/blog/670367
總結(jié)
以上是生活随笔為你收集整理的Spark Mlib TFIDF源码详读 笔记的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 前方危险-让很多“高逼格”高管深刻反思的
- 下一篇: 02、django中的上下文