基于Scala版本的TMDB大数据电影分析项目
怒發沖冠為紅顏
????????基于kaggle的TMDB電影數據集的數據分析,該數據集包括了大約5000部電影的相關信息。先來看一下TMDB電影數據集的數據
?
????????該數據集其實是csv文件,里面記錄這美國這些年上映的電影,以及電影的種類,觀看人數,主題,以及打分等詳細信息。
????????先來看一下各個字段的意義
????????不過需要注意的是,在csv文件里面并沒有表頭,也就是說并沒有上面字段。所以在使用Spark SQL處理該數據集的時候,需要創建StructType將上面的字段保存起來。
使用DataFrame處理
????????通過Spark讀取csv文件就可以直接創建出來一個DataFrame對象,如果在本項目中直接讀取csv文件雖然不會報錯,但是切分字段的時候會出現錯誤,原因很簡單,因為在genres字段里面有json數據,不是json的鍋,是這個genres字段的鍋,先來看一下這個genres字段把
[{""id"": 28, ""name"": ""Action""}, {""id"": 12, ""name"": ""Adventure""}, {""id"": 14, ""name"": ""Fantasy""}, {""id"": 878, ""name"": ""Science Fiction""}]????????大體上是這樣的,里面有多個json,而且每個json都用",",所以在split分割字段的時候貨到這這個genres字段也會根據","進行分割。
????????為了避免這種情況,第一種方法依舊使用","分割,依靠正則表達式進行分割
val value = sparkContext.textFile("date/tmdb_5000_movies.csv") val value1 = value.map(_.split(",(?=([^\"]*\"[^\"]*\")*[^\"]*$)")(1)) value1.collect().foreach(println(_))來看一下genres字段是否被切分
?
????????顯而易見,并沒有被切分。其實上面這個最后是個RDD類型的,當然也可以隱式轉換,toDF,轉換成dataFrame.
????????下面介紹第二種方法
????????通過StructType對象,首先將所有字段名放到一個字符串里面,對字符串進行操作,對字符串進行切分,切分出來的字符串依次為他們創建StructField對象(StructField對象包含了字段名和字段類型)
????????spark讀取csv文件的時候對csv進行配置參數,創建dataFrame。
????????這個方法是主要介紹的,來看一下具體操作把
//將轉換操作構造成一個方法def transform_demo() = { //創建Spark SQL環境 val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() //創建字段名字符串 val scheamdString = "budget,genres,homepage,id,keywords,original_language,original_title, overview,"+"popularity,production_companies,production_countries, release_date,revenue,runtime,spoken_languages,status" + "tagline,title,vote_average,vote_count" //將上面的字符串拆分出來 并且map到每一個StructField對象里面 val fields = scheamdString.split(",").map(fieldname => StructField(fieldname, StringType, nullable = true))//創建StructType對象,該對象包含每個字段的名稱 val scheam = StructType(fields) //csv文件所在的路徑 val path = "date/tmdb_5000_movies.csv" //創建mdf對象 val mdf = sparkSession.read.format("com.databricks.spark.csv") .schema(scheam) //指定所謂的表頭.option("inferSchema",value = false) .option("header",value = true) //schema就是header .option("nullValue","\\N") .option("escape","\"") .option("quoteAll","true").option("seq",",").csv(path) mdf }????????transform_Demo方法最后返回的是DataFrame對象。
????????來看一下dataFrame的schema
TMDB電影體裁的分布
????????什么是電影體裁的分布,說白了就是計數,所謂的WordCount。就是對genres字段進行計數,genres字段里面的數據
????????對里面的name進行wc。每個電影可能會屬于多個體裁,因此本題的意義就是求出來所有電影的體裁,關于愛情片的電影有多少部,關于動作片的電影有多少部。
????????需要先解析json數據,從每個電影中取出對應的體裁,使用wc進行計數統計。
1.實現一個CountByJson()函數,該函數實現從json解析出來并進行計數
def countByJson(field:String):org.apache.spark.rdd.RDD[(String,Int)] ={//對genres字段進行解析 genres字段只有id,name兩個列名 //jsonSchema包含了列名val jsonSchema =ArrayType(new StructType() .add("id", IntegerType).add("name",StringType)) //mdf是上面創建出來的dataframe //對genres字段進行查找篩選, //[json{}.json{},json{}] //from_json(數據,列名) 使用explode炸裂之后就是數組 mdf.select(mdf.col(field)).filter(mdf.col(field).isNotNull)//炸裂之后,重命名為 genres .select(explode(from_json(mdf.col(field), jsonSchema)) .as(field)) //genres.concat("name") = => genres.name,直接訪問name字段 .select(field.concat(".name")) //轉換成RDD .rdd //(體裁,1) .map(name=>(name.toString(),1)) //分區數為1 .repartition(1) //reducrByKey 根據key進行計數 .reduceByKey((x,y) => x + y) }來看一下精簡的流程圖,讓大家對上面的步驟有更加深刻的印象
?
????????雖然經過上面的步驟,我們完成了對電影體裁的計數,但是細心的同學應該會發現里面的name被[]包著,為了更加讓我們能夠一目了然,我們對上面的結果進行轉化內,將[]刪除。只留下(name,count)。
def countByGenres()={ val genresRDD = countByJson("genres") //將RDD的結果collect之后,對key進行轉化內,去掉[] val jsonString =genresRDD.collect().toList.map { case(genre,count) => ((genre.replace("[","").replace("]","")),count) } //遍歷結果 jsonString.foreach(println(_)) }?
????????可以看到,我們已經去掉了中括號。現在展示結果我們能更加的一目了然了。
前100個常見關鍵詞
????????找出關鍵詞中出現頻率最高的前100個,依舊是詞頻統計進行WC。不過keywords單詞是json格式的,意味著我們依舊需要對json進行解析。解析完成之后,進行上面的wc操作。
????????因此只需要更改函數countByJson里面的參數就可以。
????????看一下keywords的格式
????????思路很簡單,通過CountByJson之后,返回的是一個RDD。經過reduceByKey之后處理的數據
????????(name,count)
????????做到這里,這道題目就已經結束了,如果為了美觀也可以對(name,count)進行轉化,和上面的一樣,去掉[]。
def countByKeywords() = { val keywordsRDD = countByJson("keywords").sortBy(_._2,false) val jsonString = keywordsRDD.take(100).toList.map{case(keywords,count) => { (keywords.replace("[","").replace("]",""),count) } } jsonString }????????需要注意的是,reducrByKey處理后的數據并沒有進行排序,經過轉換算子sortBy根據(name,count),元組中的count進行排序(從大到小)。
TMDB最常見的10種預算數
????????探究每個電影的預算數,首先對預算字段進行過濾,去除預算項目為0的電影項目,根據預算聚合并聚合,根據計數進行排序。取前十個為最終的結果。
????????先來看一下budget字段的信息
?
????????這種數據最簡單,雖然在csv里面是整型的,但是我們在創建StructType對象的時候,創建的是字符串類型的。因此在使用DSL語言的時候需要先轉化列的數據類型,分組聚合group + ????????count()+order by取前10.
def countByBudget(order:String,ascending:Boolean):Array[Row]={ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ if(ascending){ //對budget字段進行轉化類型 //filter過濾//group by + count 分組聚合//orderby排序 transform_demo().withColumn("budget",col("budget") .cast("Integer")) .filter($"budget" > 0 ).groupBy("budget") .count().orderBy(order)take(10) } else{ transform_demo().withColumn("budget", col("budget") .cast("Integer")).filter($"budget" > 0).groupBy("budget").count() .orderBy(desc(order)).take(10) } }TMDB中最常見的電影時長(只展示電影數>100的時長)
????????顯而易見,這些小實驗都是wc,不過是在wc的基礎上增加了或多或少的數據操作。來看看這個電影時長的操作吧。
????????電影數>100 runtime > 100
????????先看一下runtime的數據類型
?
????????transform_demo生成的是DataFrame,我們可以直接在DataFrame上進行操作,需要先轉換數據類型,將runtime轉換成Integer整型然后根據下面的操作進行
????????filter + group by + count +orderby
????????直接來看代碼吧
def distrbutionOfRuntime() = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Spark Demo") val sparkSession = SparkSession.builder().config(sparkConf).getOrCreate() import sparkSession.implicits._ transform_demo().withColumn("runtime",col("runtime").cast("Integer")) .groupBy("runtime").count().filter($"count" > 100).orderBy($"count".asc) .collect() }生產電影最多的10大公司
????????對生產公司這個字段'product_companies'
????????不過需要注意的是這個字段依舊是包含著多個json,使用countByJson就可以實現對這種字段進行json解析以及wc操作。經過countByJson之后,就已經是(name,count)這種形式,已經完成了計數。
????????計數完成之后,(name,count)這個元組,根據元組第二個元素進行SortBy排序,排序完成之后取前十個(name,count)就完成了目標
def countByCompanies() = { val production_companiesRDD = countByJson("production_companies") .sortBy(_._2,false) val list = production_companiesRDD.take(10).toList.map {case (company, count) => {(company.replace("[", "").replace("]", ""), count) } } list }TMDB的10大電影語言
????????該字段同樣也是json字段,使用countByJson方法進行解析之后,進行的操作差不多跟之前的操作千篇一律。
//TMDB中的10大電影語言 def countByLanguage() = { val countByLanguage = countByJson("spoken_languages").sortBy(_._2,false) val joinString = countByLanguage.take(10).toList.map {case (language, count) => (language.replace("[", "").replace("]", ""), count) } joinString }總結
????????這是一個入門案例,雖然說是入門案例,其實并不簡單,對于還有json的字段,需要進行處理,單獨split會報錯,需要split+正則,使用Spark SQL需要創建StructType對象,使用相關的json解析函數。總之呢剛開始的時候可能會很難,但是做的多了發現并不是那么的難。
????????可能這個案例比較簡單,之后會有更多的案例,不過到時候進行的操作可能更難。差不多五月份左右會將數倉項目梳理一遍。
????????這個項目很簡單的,但是為了這個項目我花了有將近三天把,StructType不懂先去看了StructType,Spark SQL用到的相關函數不會又去看了看函數。
????????大家也要努力奧,好想好想她。
總結
以上是生活随笔為你收集整理的基于Scala版本的TMDB大数据电影分析项目的全部內容,希望文章能夠幫你解決所遇到的問題。