Spark 数据ETL
Spark?數據ETL
?
?
?
?
說明
1、本文翻譯自《Machine Learning with Spark》書中第三章第3,4節內容。
2、本文一些內容基于http://blog.csdn.net/u011204847/article/details/51224383。
3、大家如果有看不懂的地方可以參考原書(網上可以搜到)。
?
?
?
數據處理以及轉化
1、當我們完成了一些對數據集的探索和分析,我們知道了一些關于用戶數據以及電影數據的特征,接下來我們該做些什么呢?
2、為了讓原始數據能夠在機器學習算法中變得有用,我們首先需要清理以及在提取有用的特征值之前使用各種方法盡可能地轉化它。其中的轉化和特征提取步驟是緊密連接的,而且在一些情況下,特定的轉化就是一種特征值提取的過程。
3、我們已經看過了在電影數據集中需要清理數據的例子。通常,現實的數據集包含壞的數據、丟失的數據以及異常值。理想情況下,我們可以糾正錯誤的數據;但是,這通常都是不可能的。因為很多數據集來源于那些不能夠重復的集合操作。丟失的數據以及異常值也是很常見的,它們可以用類似于壞數據的處理方法處理。總的來說,歸結為以下廣泛的處理方法:
?
過濾掉或者移除壞數據以及丟失的數據:?
有時候這是不可避免的;然而這也意味著丟失掉大部分壞的或丟失的記錄。
?
填充壞掉或者丟失的數據:?
我們可以盡力地依據剩下的數據來給壞掉的或者丟失的數據賦值。比如賦給0值、平均值、中位數、附近的值或者相似值等方法。選擇正確的方法通常是一件棘手的任務,這取決于數據、情況和自己的經驗。
?
應用成熟的技術到異常值:?
異常值的主要問題在于它們的值可能是正確的,盡管它們是極端值。它們也有可能是錯誤的。所以很難知道我們處理的是哪種情況。異常值也可以被移除或者填充。不過幸運的是,有是統計技術(如穩健回歸)來處理異常值和極端值。
?
轉化潛在的異常值:
另一個處理異常值或者極端值得方法是轉化。例如對數或者高斯內核轉化,計算出潛在的異常值,或者顯示大范圍的潛在數據。這些類型的轉換抑制了變量大尺度變化的影響并將非線性關系轉化為一個線性的。
?
?
填充壞的或丟失的數據:
我們之前已經見過過濾壞數據的例子了。我們接著之前的代碼,下面的代碼段對壞數據應用了填充的方法,通過賦給數據點以相等于year中值的值。
years_pre_processed = movie_fields.map(lambda fields: fields[2]). map(lambda x: convert_year(x)).collect() years_pre_processed_array = np.array(years_pre_processed)
首先,我們將在選擇所有的發布年限后計算year的平均值和中位數,除了那些壞的數據。之后使用numpy函數,從years_pre_processed_array中查找壞數據的索引(參考之前我們賦予1900給數據點)。最后,我們使用這個索引來賦予中值給壞的數據:
mean_year = np.mean(years_pre_processed_array[years_pre_processed_ array!=1900]) median_year = np.median(years_pre_processed_array[years_pre_processed_ array!=1900]) index_bad_data = np.where(years_pre_processed_array==1900)[0][0] years_pre_processed_array[index_bad_data] = median_year print "Mean year of release: %d" % mean_year print "Median year of release: %d" % median_year print "Index of '1900' after assigning median: %s" % np.where(years_ pre_processed_array == 1900)[0]打印結果應該類似于如下:
Mean year of release: 1989 Median year of release: 1995 Index of '1900' after assigning median: []在這里我們計算出year的平均值和中位數,從輸出結果中我們可以看出,year的中位數因為year的傾斜分布要比平均值高許多。盡管直接決定使用一個精確的值去填充數據不是常見的做法,但是由于數據的傾斜,使用中位數去賦值是一種可行的方法。
?
?
從數據中提取有用的特征
?
1、當我們完成了對數據初始的處理和清洗,我們就可以準備從數據中提取一些實際有用的特征,這些特征數據可以用于以后的機器學習模型中的訓練。
2、特征數據是指我們用于訓練模型的一些變量。每行數據都有可能包含可以提取用于訓練的樣例。幾乎所有的機器學習模型都是工作在以數字為技術的向量數據上。因此,我們需要將粗糙的數據轉化為數字。
特征數據可以分為以下幾類:
數字特征
這類特征數據是指一些數值類型的數據。
分類特征
這類特征數據代表一些相同特性的,可以歸為一類的一些數據。例如用戶的性別、職位或者電影的類型。
文本特征
這類特征數據是從數據中的文本內容中派生出來的,例如電影名稱,描述,以及評論。
其他特征
這類特征數據都會轉化為以數字為代表的特征,例如圖片,視頻,音頻都可以表示為數字數據的集合。地理位置可以代表為經度、緯度或者經緯度之差。
?
?
數字特征
1、舊數字和提取的新的特征數值有什么區別呢?其實,在現實生活中,任何的數值數據都可以作為輸入變量,但在機器學習模型中,我們學習的是每個特征的向量權重,例如監督學習模型。
2、因此,我們需要使用那些有意義的特征數據,那些模型可以從特征值與目標數據之間學習關系的特征數據。例如,年齡就是一個合理的特征數據,比如年齡的增長和產出有著直接的關系,同樣,身高也是可以直接使用的數值特征。
?
?
分類特征
1、分類特征數據不能直接使用它們原有的粗糙的格式作為輸入使用,因為它們不是數字。但是它們其中的一些衍生值可以作為輸入的變量。比如之前所說的職位就可以有學生、程序員等。
2、這些分類變量只是名義上的變量,因為它們不存在變量值之間的順序的概念。相反,當變量之間存順序概念時,我們會傾向于使用這些常見有序的變量。
3、為了把這些分類變量轉化為數字表示,我們可以使用常用的方法,例如1-of-k編碼。這種方法需要把那些名義上的變量轉化為對機器學習任務有用的數據。常見那些粗糙格式的數據都會以名義上的變量形式編碼為有意義的數據。
4、我們假設這里有k個值可以供變量獲取,如果我們可以給每個值都賦予1到k中的索引,然后我們就可以使用程度為k的二進制向量表示一個值了。初始的實體中,向量表示的二進制值都是0,當我們賦予變量一個狀態的時候,所對應的二進制向量中對應的索引值由0變成1。
?
例如,我們先獲取上面所說的職位的所有類別變量:
all_occupations = user_fields.map(lambda fields: fields[3]). distinct().collect() all_occupations.sort()接著我們可以賦給每個可能的職位類別一個值(值得索引從零開始,因為在Python、Scala、Java數組中索引都是從0開始的)
idx = 0 all_occupations_dict = {} for o in all_occupations:all_occupations_dict[o] = idx idx += 1 # try a few examples to see what "1-of-k" encoding is assigned print "Encoding of 'doctor': %d" % all_occupations_dict['doctor'] print "Encoding of 'programmer': %d" % all_occupations_ dict['programmer']你將看到如下打印結果:
Encoding of 'doctor': 2 Encoding of 'programmer': 14最后我們可以對上面打印的結果中programmer進行編碼,我們可以首先創建一個長度為k(在這個案例中)的numpy數組并且值全部填0(我們將使用numpy數組中的zeros函數創建這個數組)。
我們將提取單詞programmer的索引并賦予1給數組的這個索引:
K = len(all_occupations_dict) binary_x = np.zeros(K) k_programmer = all_occupations_dict['programmer'] binary_x[k_programmer] = 1 print "Binary feature vector: %s" % binary_x print "Length of binary vector: %d" % K上面結果將呈現給我們長度為21的二進制特征的向量:
Binary feature vector: [ 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.0. 0. 1. 0. 0. 0. 0. 0. 0.] Length of binary vector: 21衍生特征
1、通常會從一或多個可獲得的變量中計算出衍生特征是很有用的,我們希望那些衍生特征可以相比于原來粗糙格式的變量添加更多的信息。
2、例如,我們可以計算所有電影評分數據中的用戶平均評分,用戶平均評分將提供針對用戶截差的模型。我們已經獲取了粗糙的評分數據,并且創建了新的可以讓我們學習更好模型的特征。
3、從粗糙數據中獲取衍生特征數據的例子包括平均值、中位數、求和、最大值、最小值以及總數等。比如在電影數據中,我們可以通過現在的年限減去電影發布年限獲得電影的年齡。
4、通常,這些轉化用來產生數值數據以便于更好的讓模型去學習。
5、把數字特征值轉化為分類特征值也很常見,比如
?
轉化timestamps值為分類特征值
為了演示怎樣從數字特征值衍生為分類特征值,我們將使用電影評分數據中的評分時間。這些時間都是Unix timestamps格式。我們可以用Python的datetime模塊去從timestamp中獲取date和time,然后提取day中的hour。這將為每個評分中day的hour成一個RDD。
我們將需要一個函數去提取代表評分timestamp的datetime:
?
def extract_datetime(ts):import datetimereturn datetime.datetime.fromtimestamp(ts)?
我們繼續使用之前例子之中計算出的rating_data RDD
首先,我們使用map轉化提取timestamp列,把它轉化為Python中int類型。對每個timestamp應用extract_datetime方法,然后從結果datetime對象中提取hour:
timestamps = rating_data.map(lambda fields: int(fields[3])) hour_of_day = timestamps.map(lambda ts: extract_datetime(ts).hour) hour_of_day.take(5)
如果我們從結果RDD中獲取前五條記錄,我們將看到以下輸出結果:
[17, 21, 9, 7, 7]至此我們已經將粗糙的時間數據轉化為了評分數據中代表day中hour的分類特征數據
?
現在,我們說的這種轉化可能優點粗糙,也許我們想更加貼切地定義轉化。我們可以將每天中的小時轉化為代表每天時間中的塊。例如我們可以定義morning是從7 am到 11 am、lunch是從11 am到 1am等。使用這些塊,我們可以創建方法給每天中的時間賦值,下面將day中的hour作為輸入:
def assign_tod(hr):times_of_day = {'morning' : range(7, 12),'lunch' : range(12, 14),'afternoon' : range(14, 18),'evening' : range(18, 23),'night' : range(23, 7)} for k, v in times_of_day.iteritems():if hr in v:return k?
現在,我們可以將assign_tod函數應用到存在于hour_of_day RDD中的每個評分記錄中的hour上。
time_of_day = hour_of_day.map(lambda hr: assign_tod(hr)) time_of_day.take(5)如果我們獲取這個RDD的前5條記錄,我們將看到如下轉化后的值:
['afternoon', 'evening', 'morning', 'morning', 'morning']到此,我們已經將timestamp變量轉化為24小時格式的hours變量,以及自定義的每天中的時間值。因此我們已經有了分類特征值,可以使用之前介紹的1-of-k編碼方法去生成二進制特征的向量。
?
?
文本特征值
1、在某些情況下,文本特征值是以分類以及衍生特征存在的。我們拿電影的描述信息作為例子。這里,粗糙的數據不能被直接使用,即使是作為分類特征,因為如果每個文本都有值,那將會產生無限種可能組合的單詞。我們的模型幾乎不會出現兩種相同特征,就算有那么學習效率也不會高。因此,我們希望將原始文本變成一種更適合機器學習的形式。
2、有很多的方法可以處理文本,而且自然語言領域處理致力于處理、呈現和模型化文本內容。我們將介紹簡單和標準的方法來實現文本特征提取,這個方法就是詞袋模型表示。
3、詞袋模型將文本塊視為單詞的集合以及可能存在的數字,詞袋方法的處理如下:
標記:首先,一些形式的標記用于將文本分割為標記的集合(一般是單詞,數字等)。例如常見的空格標記,將文本按照每個空格分隔,還有其他的一些標點和非字母數字的標記。
可以移除的停止詞:一般我們會移除文本中非常常見的詞,例如”the”、”and”、”but”(這些都稱為停止詞)。
?
詞干提取:接下來的操作包括詞干提取,一種獲取輸入項,然后將其提取為其最基礎的值。一個常見的例子就是復數編程單數,或者dogs變成dog。有很多方法可以實現詞干提取,有很多文本處理庫也包含各種詞干提取算法。
?
向量化:最后一步是將處理項轉化為向量表示形式。最簡單的形式也許就是二進制的向量表示形式,如果一個處理項包含在文本中,我們就給它賦值為1,如果沒有就賦值為0。本質上是我們之前提到的分類的1-of-k編碼。類似1-of-k編碼,這里需要一個字典將這些項映射為一個個索引。也許你會想到,這里可能存在幾百萬單獨項。因此,使用稀疏向量表示形式是非常嚴格的,只在那些處理項已被保存的情況下使用,這樣可以節省內存、磁盤空間以及處理時間。
?
簡單文本特征提取
我們使用電影評分數據中的電影名稱演示以二進制向量方法提取文本特征值。
首先我們創建函數去除每部電影的發布年限,僅留下電影名稱。
電影數據示例:
1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0
我們將使用Python的 regular expression模塊re,去搜索出存在于電影名稱列的電影發布年限。當我們匹配到這個regular expression,我們將只提取出電影名稱,示例:
def extract_title(raw):import re# this regular expression finds the non-word (numbers) betweenparenthesesgrps = re.search("\((\w+)\)", raw)if grps:# we take only the title part, and strip the trailingwhite spacefrom the remaining text, belowreturn raw[:grps.start()].strip()else:return raw接下來,我們將從movie_fields RDD中提取出粗糙的電影名稱:
//包含電影發布年限,格式:Toy Story (1995) raw_titles = movie_fields.map(lambda fields: fields[1])然后我們通過下面的代碼提取5條記錄測試extract_title函數的功能:
for raw_title in raw_titles.take(5):print extract_title(raw_title)通過打印結果我們可以驗證函數執行情況,打印結果示例:
Toy Story GoldenEye Four Rooms Get Shorty Copycat我們將應用函數以及標記模式來提取電影名稱為單個元素,下面我們使用簡單地空格標記來分離電影名稱。
movie_titles = raw_titles.map(lambda m: extract_title(m)) # next we tokenize the titles into terms. We'll use simple whitespace tokenization title_terms = movie_titles.map(lambda t: t.split(" ")) print title_terms.take(5)打印結果:
[[u'Toy', u'Story'], [u'GoldenEye'], [u'Four', u'Rooms'], [u'Get',u'Shorty'], [u'Copycat']]現在我們可以看出電影名稱以及被按照空格分離為單個的標記了。
為了給每一項賦值一個向量的索引,我們需要創建詞典,將每一項都映射到一個整數索引。
首先,我們將使用Spark的flatMap函數來擴張title_terms RDD中每條記錄的list字符串,轉化為每條記錄都是一項的名為all_terms的RDD。
我們獲取所有的唯一項,然后賦值索引,就像之前的對職位操作的1-of-k編碼。
?
# next we would like to collect all the possible terms, in order to build out dictionary of term <-> index mappings all_terms = title_terms.flatMap(lambda x: x).distinct().collect() # create a new dictionary to hold the terms, and assign the "1-of-k" indexes idx = 0 all_terms_dict = {} for term in all_terms:all_terms_dict[term] = idx idx +=1?
我們打印出唯一項的總數來測試我們的map功能是否正常工作:
print "Total number of terms: %d" % len(all_terms_dict) print "Index of term 'Dead': %d" % all_terms_dict['Dead'] print "Index of term 'Rooms': %d" % all_terms_dict['Rooms']打印結果:
Total number of terms: 2645 Index of term 'Dead': 147 Index of term 'Rooms': 1963我們也可以使用Spark的zipWithIndex函數來更加有效地實現上面的結果,這個函數獲取values的RDD然后通過索引合并它們并且創建一個新的key-value對RDD,這個新的RDD的key就是唯一項,value是這個項的字典索引。我們通過使用collectAsMap函數來將這個key-value RDD作為Python字典方法傳入driver。
all_terms_dict2 = title_terms.flatMap(lambda x: x).distinct(). zipWithIndex().collectAsMap() print "Index of term 'Dead': %d" % all_terms_dict2['Dead'] print "Index of term 'Rooms': %d" % all_terms_dict2['Rooms']
打印結果:
Index of term 'Dead': 147 Index of term 'Rooms': 1963
最后一步是創建一個函數將唯一項的集合轉化為一個稀疏的向量表示形式。為了達到效果,我們將創建一個空的,有一行以及和字典中唯一項總數的列的稀疏矩陣。然后我們將通過輸入列表中的每一項來檢查這一項是否存在于我們的唯一項字典中。如果是,我們將給這個字典中對應的這個唯一項的索引賦值為1。
# this function takes a list of terms and encodes it as a scipy sparse vector using an approach # similar to the 1-of-k encoding def create_vector(terms, term_dict):from scipy import sparse as spnum_terms = len(term_dict)x = sp.csc_matrix((1, num_terms))for t in terms:if t in term_dict:idx = term_dict[t]x[0, idx] = 1return x
當我們有了上面的函數之后,我們會將它應用到提取項的RDD中的每一條記錄中。
all_terms_bcast = sc.broadcast(all_terms_dict) term_vectors = title_terms.map(lambda terms: create_vector(terms, all_ terms_bcast.value)) term_vectors.take(5)我們可以查看一些稀疏向量新的RDD的執行記錄:
[<1x2645 sparse matrix of type '<type 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>' with 1 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>' with 2 stored elements in Compressed Sparse Column format>, <1x2645 sparse matrix of type '<type 'numpy.float64'>' with 1 stored elements in Compressed Sparse Column format>]從上面的記錄中可以看到電影名稱已經被轉化為一個稀疏向量,我們可以看出那些提取為兩個單詞的電影名稱項在向量中有兩個非0的實體,那些提取為一個單詞名稱項的在向量中有一個非0實體,其他類似。
?
標準化特征值
一旦特征值被提取為向量的形式,通常的預處理步驟為標準化數值數據。思路為通過將每個值轉化為標準大小的方法轉化每一個數值特征值。我們有不同的方法來標準化,如下:
標準化一個特征:
這通常應用于數據集中的一個單獨的特征,例如減去均值(使特征值中心化)或者使用標準正態變換。
標準化特征向量:
這通常是應用轉化到數據集中給定一行的所有特征值,這樣得到的特征向量有一個標準化的長度。也就是說,我們將確保每個特征向量是以1為基礎按比例縮小的。
?
下面我們使用第二種方法作為例子。我們將使用Python中numpy模塊的norm函數來實現向量的標準化,,首先計算一個L2規范的隨機向量,然后使用這種規范分離向量中每個元素除來創建我們標準化的向量。
np.random.seed(42) x = np.random.randn(10) norm_x_2 = np.linalg.norm(x) normalized_x = x / norm_x_2 print "x:\n%s" % x print "2-Norm of x: %2.4f" % norm_x_2 print "Normalized x:\n%s" % normalized_x print "2-Norm of normalized_x: %2.4f" % np.linalg.norm(normalized_x)上面將給出如下結果,
x: [ 0.49671415 -0.1382643 0.64768854 1.52302986 -0.23415337 -0.234136961.57921282 0.76743473 -0.46947439 0.54256004] 2-Norm of x: 2.5908 Normalized x: [ 0.19172213 -0.05336737 0.24999534 0.58786029 -0.09037871-0.09037237 0.60954584 0.29621508 -0.1812081 0.20941776] 2-Norm of normalized_x: 1.0000
使用MLlib實現特征標準化
Spark在它的MLlib機器學習庫中內建了一些功能擴展以及標準化的函數。包括StandardScaler,用于標準正態變換;以及Normalizer,提供了我們之前處理示例代碼中的向量標準化功能。
?
下面讓我們簡單地使用MLlib的Normalizer來比較之前的結果:
from pyspark.mllib.feature import Normalizer normalizer = Normalizer() vector = sc.parallelize([x])當引入需要的類以后,我們將實例化Normalizer。注意,在大多數的Spark解決方案中,我們需要帶RDD的Normalizer作為輸入(包含了numpy數組或者MLlib向量);因此,我們將從向量x創建單元素的RDD來作為演示。
?
我們將在RDD上使用Normalizer的transform函數。當RDD中最終只有一個向量時,我們通過調用first來返回向量到driver中,最后調用toArray函數將vector轉化回numpy數組。
normalized_x_mllib = normalizer.transform(vector).first().toArray()最后,我們打印詳情,和之前所做的一樣,對比結果:
print "x:\n%s" % x print "2-Norm of x: %2.4f" % norm_x_2 print "Normalized x MLlib:\n%s" % normalized_x_mllib print "2-Norm of normalized_x_mllib: %2.4f" % np.linalg. norm(normalized_x_mllib)你最后會發現通過我們的代碼實現了同樣的標準化向量。但是,使用MLlib中內建的方法比我們自己的函數要更加方便和高效。
?
總結
以上是生活随笔為你收集整理的Spark 数据ETL的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: codeforces 56E 多米诺骨牌
- 下一篇: 工具栏的打印图标不见了_我的电脑右下角任