生活随笔
收集整理的這篇文章主要介紹了
Spark项目实践--基于 TMDB 数据集的电影数据分析
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
基于 TMDB 數據集的電影數據分析 一、環境搭建 二、數據預處理 三、使用 Spark 將數據轉為 DataFrame 四、使用 Spark 進行數據分析并可視化 五,結語
一、環境搭建
從假設裸機,環境搭建開始,具體環境搭建操作大體流程如下,具體詳細流程點擊查看另一篇博客:spark環境搭建 大體流程: (1)安裝Linux操作系統:比如可以安裝Ubuntu 16.04 (2)安裝Hadoop:需要在Linux系統上安裝Hadoop (3)安裝Spark:需要在Linux系統上安裝Spark (4)最后為了方便編寫代碼,實現Linux與Windows下的pycharm對接。
本次實驗環境: pycharm2019專業版 Ubuntu16.04 pip3=10.0.1 pyspark3.0.2
二、數據預處理
環境搭建完成之后,開始做項目。 首先數據集介紹: 本次項目使用的數據集來自數據網站 Kaggle 的 tmdb-movie-metadata 電影數據集,該數據集包含大約 5000 部電影的相關數據。數據包含以下字段: 上圖中可以看出數據中某些字段包含 json 數據,因此直接使用 DataFrame 進行讀取會出現分割錯誤 ,所以如果要創建 DataFrame,需要先直接讀取文件生成 RDD,再將 RDD 轉為 DataFrame。過程中,還需要使用 python3 中的 csv 模塊對數據進行解析和轉換。
為了更方便的對 csv 文件轉化的 RDD 進行處理,需要首先去除csv文件的標題行。完成后,將處理好的文件 tmdb_5000_movies.csv 存儲到 HDFS 上方便進一步的處理,使用下面命令將文件上傳至 HDFS:
hdfs dfs -put tmdb_5000_movies.csv
問題一 如果上傳過程中遇到如下情況:hdfs: command not found 原因是沒有在bin目錄之下設置路徑,解決: 1.sudo vi /etc/profile打開原文件 2.在文件最后新起一行 添加路徑export PATH=/usr/local/hadoop/bin:$PATH 其中這里的/usr/local/hadoop/bin是你的hdfs所在的目錄,一定看清楚。 3.最后要使文件生效 ,這個一定不要忘了,不然文件也是白配置。命令:source /etc/profile
這時文件已經被上傳至HDFS中
三、使用 Spark 將數據轉為 DataFrame
打開Windows下的pycharm,連接到Linux,開始編程 因為讀入的 csv 文件是結構化的數據,因此可以將數據創建為 DataFrame 方便進行分析。 上面也說過,為了創建DataFrame ,要將HDFS上的數據加載為RDD,由RDD轉為DataFrame ,具體代碼以及解析如下:
'''
csv --> HDFS--> RDD --> DataFrame
1.創建 SparkSession 和 SparkContext 對象。
2.為 RDD 轉為 DataFrame 制作表頭 (schema)。schema 是一個 StructType 對象,
該對象使用一個 StructField 數組創建。
每一個 StructField 都代表結構化數據中的一個字段,構造 StructField 需要 3 個參數
1. 字段名稱
2. 字段類型
3. 字段是否可以為空
'''
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
from pyspark
import SparkContext
from pyspark
. sql
import SparkSession
, Row
from pyspark
. sql
. types
import StringType
, StructField
, StructType
import json
import csv
import matplotlib
. pyplot
as plt
sc
= SparkContext
( 'local' , 'spark_project' )
sc
. setLogLevel
( 'WARN' )
spark
= SparkSession
. builder
. getOrCreate
( )
schemaString
= "budget,genres,homepage,id," \
"keywords,original_language," \
"original_title,overview,popularity," \
"production_companies,production_countries," \
"release_date,revenue,runtime,spoken_languages," \
"status,tagline,title,vote_average,vote_count" fields
= [ StructField
( field
, StringType
( ) , True ) for field
in schemaString
. split
( ',' ) ] schema
= StructType
( fields
)
moviesRdd
= sc
. textFile
( 'tmdb_5000_movies.csv' ) . map ( lambda line
: Row
( * next ( csv
. reader
( [ line
] ) ) ) )
mdf
= spark
. createDataFrame
( moviesRdd
, schema
)
以上數據預處理結束。最后可以打印出來查看數據結構,進行下面的數據分析。
四、使用 Spark 進行數據分析并可視化
接下來使用上述處理完成后得到的 DataFrame mdf 進行數據分析,首先對數據中的主要字段單獨進行分析,然后再分析不同字段之間的關系。 為了方便使用matplotlib進行數據可視化,對于每個字段不同的分析,都將分析結果導出為json文件保存之后,對該文件進行數據可視化。 接下來的每個分析都需要進行保存,所以提前定義一個save函數,直接調用即可:
def save ( path
, data
) : with open ( path
, 'w' ) as f
: f
. write
( data
)
注意!!!下列代碼中所有的save有關的函數如果被注釋掉的請打開,否則存儲不了的話,可視化分析是不可以的。
1.單獨分析
(1)分析數據集中電影的體裁分布
數據集中可以看出,電影的體裁字段是一個 json 格式的數據,所以,要統計不同體裁的電影的數量,首先需要解析這個字段的 json 數據,從中取出每個電影對應的體裁數組,然后使用詞頻統計的方法,根據相同key相加,統計不同體裁出現的頻率,即可得到電影的體裁分布。
madff
= mdf
. select
( "genres" ) . filter ( mdf
[ "genres" ] != '' ) . rdd
. flatMap
( lambda g
: [ ( v
, 1 ) for v
in map ( lambda x
: x
[ 'name' ] , json
. loads
( g
[ "genres" ] ) ) ] ) \
. repartition
( 1 ) . reduceByKey
( lambda x
, y
: x
+ y
)
res
= madff
. collect
( )
countByGenres
= list ( map ( lambda v
: { "genre" : v
[ 0 ] , "count" : v
[ 1 ] } , res
) )
print ( countByGenres
)
with open ( '/home/hadoop/test/TMDB/genres.json' , 'r' ) as f
: data
= json
. load
( f
)
print ( data
)
x
= list ( map ( lambda x
: x
[ 'genre' ] , data
) )
y
= list ( map ( lambda x
: x
[ 'count' ] , data
) )
print ( x
)
print ( y
)
plt
. bar
( x
, y
, 0.5 )
plt
. show
( )
(2) 前 100 個常見關鍵詞
該項分析電影關鍵詞中出現頻率最高的前一百個。字段也是 json 格式數據,因此也是進行頻率統計,統計結果進行降序排序并取前 100 項。
keyword
= mdf
. select
( "keywords" ) . filter ( mdf
[ "keywords" ] != '' ) . rdd
. flatMap
( lambda g
: [ ( v
, 1 ) for v
in map ( lambda x
: x
[ 'name' ] , json
. loads
( g
[ "keywords" ] ) ) ] ) \
. repartition
( 1 ) . reduceByKey
( lambda x
, y
: x
+ y
)
res1
= keyword
. sortBy
( lambda x
: - x
[ 1 ] ) . take
( 100 )
countByKeywords
= list ( map ( lambda v
: { "x" : v
[ 0 ] , "value" : v
[ 1 ] } , res1
) )
print ( countByKeywords
)
save
( '/home/hadoop/test/TMDB/keywords.json' , json
. dumps
( countByKeywords
) ) from wordcloud
import WordCloud
with open ( '/home/hadoop/test/TMDB/keywords.json' , 'r' ) as f
: data
= json
. load
( f
)
print ( data
)
x
= list ( map ( lambda x
: x
[ 'x' ] , data
) )
save
( '/home/hadoop/test/TMDB/keywordswc.json' , json
. dump
( x
) )
跟上面的分兩個批次執行,一個處理文本,一個將文本畫詞云
with open ( '/home/hadoop/test/TMDB/keywordswc.json' , 'r' ) as f
: datawc
= json
. load
( f
)
print ( datawc
)
datawc1
= "," . join
( datawc
)
wc
= WordCloud
( background_color
= "white" , \width
= 800 , \height
= 600 , \
) . generate
( datawc1
)
wc
. to_file
( 'keywords.png' )
plt
. imshow
( wc
)
plt
. axis
( 'off' )
plt
. show
( )
可視化結果: (3) 數據集 中最常見的 10 種預算數
countByBudget
= mdf
. filter ( mdf
[ "budget" ] != 0 ) . groupBy
( "budget" ) . count
( ) . \orderBy
( 'count' , ascending
= False ) . toJSON
( ) . map ( lambda j
: json
. loads
( j
) ) . take
( 10 )
print ( countByBudget
)
save
( '/home/hadoop/test/TMDB/budget.json' , json
. dumps
( countByBudget
) )
with open ( '/home/hadoop/test/TMDB/budget.json' , 'r' ) as f
: data
= json
. load
( f
)
print ( data
)
x
= list ( map ( lambda x
: x
[ 'budget' ] , data
) )
y
= list ( map ( lambda x
: x
[ 'count' ] , data
) )
print ( x
)
print ( y
)
plt
. bar
( x
, y
, 0.5 )
plt
. xticks
( rotation
= 45 )
plt
. show
( )
結果: 最常見的預算從高到低排列,可以看出20000000的最多。
(4)數據集中最常見電影時長 (只展示電影數大于 100 的時長)
Runtime
= mdf
. filter ( mdf
[ "runtime" ] != 0 ) . groupBy
( "runtime" ) . count
( ) \
. filter ( 'count>=100' ) . toJSON
( ) . map ( lambda j
: json
. loads
( j
) ) . collect
( )
print ( Runtime
)
save
( '/home/hadoop/test/TMDB/runtime.json' , json
. dumps
( Runtime
) )
with open ( '/home/hadoop/test/TMDB/runtime.json' , 'r' ) as f
: runtimedata
= json
. load
( f
)
print ( runtimedata
)
x
= list ( map ( lambda x
: x
[ "runtime" ] , runtimedata
) )
y
= list ( map ( lambda x
: x
[ "count" ] , runtimedata
) )
plt
. title
( 'Four ' )
plt
. xlabel
( 'runtime' )
plt
. ylabel
( 'count' )
ax
= plt
. bar
( x
, y
)
for rect
in ax
: w
= rect
. get_height
( ) plt
. text
( rect
. get_x
( ) + rect
. get_width
( ) / 2 , w
, '%d' % int ( w
) , ha
= 'center' , va
= 'bottom' )
plt
. show
( )
結果: 從可視化結果來看最常見的電影時長是90min左右
(5) 生產電影最多的 10 大公司
解析 json 格式字段從中提取出 name 并進行詞頻統計,這里取出"production_companies"字段名稱,進行排序,尋找前十個,選擇字段select(“production_companies”) ,在字段中篩選出空值 ,使用rdd中的map,最終聚合
res
= mdf
. select
( "production_companies" ) . filter ( mdf
[ "production_companies" ] != '' ) . rdd\
. flatMap
( lambda g
: [ ( v
, 1 ) for v
in map ( lambda x
: x
[ 'name' ] , json
. loads
( g
[ "production_companies" ] ) ) ] ) \
. repartition
( 1 ) . reduceByKey
( lambda x
, y
: x
+ y
)
res5
= res
. sortBy
( lambda x
: - x
[ 1 ] ) . take
( 10 )
countByCompanies
= list ( map ( lambda v
: { "company" : v
[ 0 ] , "count" : v
[ 1 ] } , res5
) )
print ( countByCompanies
)
save
( '/home/hadoop/test/TMDB/company_count.json' , json
. dumps
( countByCompanies
) )
with open ( '/home/hadoop/test/TMDB/company_count.json' , 'r' ) as f
: companydata
= json
. load
( f
)
print ( companydata
)
x
= list ( map ( lambda x
: x
[ "company" ] , companydata
) )
y
= list ( map ( lambda x
: x
[ "count" ] , companydata
) )
ax
= plt
. bar
( x
, y
)
for rect
in ax
: w
= rect
. get_height
( ) plt
. text
( rect
. get_x
( ) + rect
. get_width
( ) / 2 , w
, '%d' % int ( w
) , ha
= 'center' , va
= 'bottom' )
plt
. xticks
( rotation
= 10 )
plt
. show
( )
可視化: 最后x軸字體過長,傾斜度不多,大家可以自己再次調節看看。
(6)TMDb 中的 10 大電影語言 餅圖
該字段也是 JSON 數據,因此首先對每個項目進行詞頻統計,然后過濾掉語言為空的項目,最后排序取前十即可。
res
= mdf
. select
( "spoken_languages" ) . filter ( mdf
[ "spoken_languages" ] != '' ) . rdd\
. flatMap
( lambda g
: [ ( v
, 1 ) for v
in map ( lambda x
: x
[ 'name' ] , json
. loads
( g
[ "spoken_languages" ] ) ) ] ) \
. repartition
( 1 ) . reduceByKey
( lambda x
, y
: x
+ y
)
res6
= res
. sortBy
( lambda x
: - x
[ 1 ] ) . take
( 10 )
countByLanguage
= list ( map ( lambda v
: { "language" : v
[ 0 ] , "count" : v
[ 1 ] } , res6
) )
print ( countByLanguage
)
save
( '/home/hadoop/test/TMDB/language.json' , json
. dumps
( countByLanguage
) )
with open ( '/home/hadoop/test/TMDB/language.json' , 'r' ) as f
: landata
= json
. load
( f
)
print ( landata
)
x
= list ( map ( lambda x
: x
[ 'language' ] , landata
) )
y
= list ( map ( lambda x
: x
[ 'count' ] , landata
) )
plt
. pie
( x
= y
, labels
= x
, autopct
= '%.2f%%' , pctdistance
= 0.8 , labeldistance
= 1.1 , startangle
= 180 , radius
= 1.2 , counterclock
= False , wedgeprops
= { 'linewidth' : 1.5 , 'edgecolor' : 'green' } , textprops
= { 'fontsize' : 10 , 'color' : 'black' } , )
plt
. legend
( loc
= 'upper left' , bbox_to_anchor
= ( - 0.3 , 1 ) )
plt
. grid
( )
plt
. show
( )
可視化: 其中有中文字體,由于沒有設置中文沒有顯示出來,這也是在最后整理的時候發現的問題。
2.字段之間的關系分析
(1)預算與評價的關系
對于每個電影,需要導出如下的數據:[電影標題title,預算budget,評價vote_average],其中選擇后兩個字段進行關系繪圖 需要注意的是,上述代碼都是放在一個代碼文件中運行的,到這里開始一個關系就是一個py文件,這里除了調用必要的包之外,還需要調用上述文件的mdf變量以及save函數。 import dataone from dataone import mdf #調用第一個文件中定義的變量mdf
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
import json
import csv
import matplotlib
. pyplot
as plt
import dataone
from dataone
import mdf
budgetVote
= mdf
. select
( "title" , "budget" , "vote_average" ) \
. filter ( mdf
[ "budget" ] != 0 ) . filter ( mdf
[ "vote_count" ] > 100 ) . collect
( )
print ( budgetVote
)
dataone
. save
( 'budget_vote.json' , json
. dumps
( budgetVote
) )
with open ( 'budget_vote.json' , 'r' ) as f
: buddata
= json
. load
( f
) print ( buddata
) x
= list ( map ( lambda x
: x
[ 1 ] , buddata
) )
y
= list ( map ( lambda x
: x
[ 2 ] , buddata
) ) X
= [ ]
Y
= [ ]
for i
in x
: X
. append
( int ( i
) / 10000 )
for i
in y
: Y
. append
( float ( i
) )
print ( X
)
print ( Y
)
plt
. scatter
( X
, Y
)
plt
. xticks
( rotation
= 90 )
plt
. show
( )
保存的json文件: 可視化(只取一部分值畫圖):
(2)電影發行時間與評價的關系
這部分考慮發行時間與評價之間的關系,因此對于每個電影,需要導出如下的數據:[電影標題,發行時間,評價]
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
import json
import csv
import matplotlib
. pyplot
as plt
import dataone
from dataone
import mdf
dateVote
= mdf
. select
( mdf
[ "release_date" ] , "vote_average" , "title" ) \
. filter ( mdf
[ "release_date" ] != "" ) . filter ( mdf
[ "vote_count" ] > 100 ) . collect
( ) dataone
. save
( 'date_vote.json' , json
. dumps
( dateVote
) )
with open ( 'date_vote.json' , 'r' ) as f
: datedata
= json
. load
( f
)
print ( datedata
)
x
= list ( map ( lambda x
: x
[ 0 ] , datedata
) )
y
= list ( map ( lambda x
: x
[ 1 ] , datedata
) )
X
= [ ]
Y
= [ ]
for i
in x
: X
. append
( i
)
for i
in y
: Y
. append
( float ( i
) )
print ( X
)
print ( Y
)
plt
. bar
( X
, Y
)
plt
. xticks
( rotation
= 90 )
plt
. show
( )
看一下保存文件的數據結構: 可視化(依舊是部分取值):
(3)流行度和評價的關系
這部分考慮流行度與評價之間的關系,因此對于每個電影,需要導出如下的數據:[電影標題,流行度,評價]
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
import json
import csv
import matplotlib
. pyplot
as plt
import dataone
from dataone
import mdf
popVote
= mdf
. select
( "title" , "popularity" , "vote_average" ) \
. filter ( mdf
[ "popularity" ] != 0 ) . filter ( mdf
[ "vote_count" ] > 100 ) . collect
( )
dataone
. save
( 'pop_vote.json' , json
. dumps
( popVote
) ) with open ( 'pop_vote.json' , 'r' ) as f
: popdata
= json
. load
( f
)
x
= list ( map ( lambda x
: x
[ 1 ] , popdata
) )
y
= list ( map ( lambda x
: x
[ 2 ] , popdata
) )
X
= [ ]
Y
= [ ]
for i
in x
: X
. append
( float ( i
) )
for i
in y
: Y
. append
( float ( i
) )
print ( X
)
print ( Y
)
plt
. scatter
( X
, Y
)
plt
. xticks
( rotation
= 90 )
plt
. xlabel
( "popularity" )
plt
. ylabel
( "vote_average" )
plt
. show
( )
(4)公司生產的電影平均分和數量的關系
計算每個公司生產的電影數量及這些電影的平均分分布。 首先,需要對數據進行過濾,去掉生產公司字段為空和評價人數小于 100 的電影,然后對于每一條記錄,得到一條如下形式的記錄: [公司名,(評分,1)]
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
import json
import csv
import matplotlib
. pyplot
as plt
import dataone
from dataone
import mdf budgetRevenue
= mdf
. select
( "title" , "budget" , "revenue" ) \
. filter ( mdf
[ "budget" ] != 0 ) . filter ( mdf
[ 'revenue' ] != 0 ) . collect
( )
print ( budgetRevenue
)
dataone
. save
( 'budget_revenue.json' , json
. dumps
( budgetRevenue
) ) with open ( 'budget_revenue.json' , 'r' ) as f
: buddata
= json
. load
( f
)
x
= list ( map ( lambda x
: x
[ 1 ] , buddata
) )
y
= list ( map ( lambda x
: x
[ 2 ] , buddata
) )
X
= [ ]
Y
= [ ]
for i
in x
: X
. append
( float ( i
) )
for i
in y
: Y
. append
( int ( i
) )
print ( X
)
print ( Y
)
plt
. scatter
( X
, Y
)
plt
. xticks
( rotation
= 90 )
plt
. xlabel
( "budget" )
plt
. ylabel
( "revenue" )
plt
. show
( )
(5)電影預算和營收的關系
這部分考慮電影的營收情況,因此對于每個電影,需要導出如下的數據: [電影標題,預算,收入]
import os
os
. environ
[ "JAVA_HOME" ] = "/usr/lib/jvm/jdk1.8.0_162"
os
. environ
[ "PYSPARK_PYTHON" ] = "/usr/bin/python3.5"
import json
import csv
import matplotlib
. pyplot
as plt
import dataone
from dataone
import mdf
budgetRevenue
= mdf
. select
( "title" , "budget" , "revenue" ) \
. filter ( mdf
[ "budget" ] != 0 ) . filter ( mdf
[ 'revenue' ] != 0 ) . collect
( )
print ( budgetRevenue
)
dataone
. save
( 'budget_revenue.json' , json
. dumps
( budgetRevenue
) ) with open ( 'budget_revenue.json' , 'r' ) as f
: buddata
= json
. load
( f
)
x
= list ( map ( lambda x
: x
[ 1 ] , buddata
) )
y
= list ( map ( lambda x
: x
[ 2 ] , buddata
) )
X
= [ ]
Y
= [ ]
for i
in x
: X
. append
( int ( i
) )
for i
in y
: Y
. append
( int ( i
) )
print ( X
)
print ( Y
)
plt
. scatter
( X
, Y
)
plt
. xticks
( rotation
= 90 )
plt
. xlabel
( "budget" )
plt
. ylabel
( "revenue" )
plt
. show
( )
五,結語
到此,整個數據分析項目結束。 總結,在進行項目實驗時: 對于數據 1.考慮數據的結構 2.考慮數據的字段,可以做什么有價值的分析,以及用什么樣的可視化可以將分析結果最直觀的表達出來 對于編程 1.了解數據結構之后設計相應的數據框架(存儲數據的格式),如本項目中將csv文件傳輸到HDFS中,HDFS將文件加載至RDD,最后由RDD轉為DataFrame進行數據分析。 也要想到為什么最終會選擇使用DataFrame,因為
DataFrame是一種表格型數據結構,它含有一組有序的列,每列可以是不同的值。DataFrame既有行索引,也有列索引
DataFrame方便對數據進行提取分析處理。
2.使用的Spark知識,總的就是選擇select,過濾filter,聚合reduceByKey,統計,排序sortBy。還需熟練使用matplotlib畫圖方法。
至此結束,若有錯誤,請大佬們指正,感謝閱讀。
總結
以上是生活随笔 為你收集整理的Spark项目实践--基于 TMDB 数据集的电影数据分析 的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔 網站內容還不錯,歡迎將生活随笔 推薦給好友。