大数据技术原理与应用:期末考点总结
個人期末復習材料,根據林子雨的大數據技術教材與其它資料整理。
目錄
- 第一章 大數據概述
- 第二章 Hadoop
- 第三章 HDFS
- 第四章 HBase
- 第五章 NoSQL
- 第六章 云數據庫
- 第七章 MapReduce
- 第八章 Hadoop 2.x
- 第九章 Spark
- 第十章 流計算
第一章 大數據概述
1.大數據的4v特征
- 數據量大 volume
- 價值密度低 value
- 數據類型繁多 variety
- 處理速度快 velocity
2.大數據3種思維方式的轉變
在思維方式方面,大數據完全顛覆了傳統的思維方式:
-
全樣而非抽樣
-
效率而非精確
-
相關而非因果
3.大數據兩大核心技術
分布式存儲和分布式處理
?
4.大數據計算模式及其代表產品
| 批處理計算 | 針對大規模數據的批量處理 | MapReduce、Spark等 |
| 流計算 | 針對流數據的實時計算 | Storm、S4、Flume、Streams、Puma、DStream、Super Mario、銀河流數據處理平臺等 |
| 圖計算 | 針對大規模圖結構數據的處理 | Pregel、GraphX、Giraph、PowerGraph、Hama、GoldenOrb等 |
| 查詢分析計算 | 大規模數據的存儲管理和查詢分析 | Dremel、Hive、Cassandra、Impala等 |
?
5.大數據、云計算與物聯網之間的區別和聯系
第二章 Hadoop
Hadoop面試題 http://www.dajiangtai.com/community/18456.do
1.Hadoop的發展歷史
2002年,Hadoop起源于Doug Cutting開發Apache Nutch網絡搜索引擎項目。
2004年,Nutch項目也模仿GFS開發了自己的分布式文件系統NDFS(Nutch Distributed File System),也就是HDFS的前身。
2004年,谷歌公司又發表了另一篇具有深遠影響的論文《MapReduce:Simplified Data Processing on Large Clusters(Mapreduce:簡化大規模集群上的數據處理)》,闡述了MapReduce分布式編程思想。
2005年,Doug Cutting等人開始嘗試實現MapReduce計算框架,并將它與NDFS(Nutch Distributed File System)結合,用以支持Nutch引擎的主要算法,Nutch開源實現了谷歌的MapReduce。
2006年2月,由于NDFS和MapReduce在Nutch引擎中有著良好的應用,Nutch中的NDFS和MapReduce開始獨立出來,成為Lucene項目的一個子項目,稱為Hadoop,同時,Doug Cutting加盟雅虎。
2008年1月,Hadoop正式成為Apache頂級項目,包含眾多子項目,Hadoop也逐漸開始被雅虎之外的其他公司使用。同年4月,Hadoop打破世界紀錄,成為最快排序1TB數據的系統,它采用一個由910個節點構成的集群進行運算,排序時間只用了209秒。
在2009年5月,Hadoop更是把1TB數據排序時間縮短到62秒。Hadoop從此名聲大震,迅速發展成為大數據時代最具影響力的開源分布式開發平臺,并成為事實上的大數據處理標準。
2.Hadoop的特性
Hadoop以一種可靠、高效、可伸縮的方式進行處理的,它具有以下幾個方面的特性:
- 高可靠性:多副本
- 高效性:并行工作
- 高可擴展性:方便擴展服務器
- 高容錯性:失敗的任務會重新分配
- 成本低:廉價的集群設備
- 運行在Linux平臺上
- 支持多種編程語言
3.Hadoop的版本
Apache Hadoop版本分為兩代,我們將第一代Hadoop稱為Hadoop 1.0,第二代Hadoop稱為Hadoop 2.0。
Hadoop 1.x 和Hadoop 2.x的區別:在1.x版本中,MapReduce負責邏輯運算和資源調度,耦合性比較大;2.x版本中新增了YARN,負責資源調度,這樣MapReduce就只負責運算了。
?
4.Hadoop生態系統/項目結構
| HDFS | 分布式文件存儲系統 |
| MapReduce | 分布式并行計算框架 |
| YARN | 資源調度管理框架 |
| HBase | 分布式非關系型數據庫 |
| Hive | Hadoop上的數據倉庫。提供HQL,將HQL語句轉化為MapReduce程序 |
| Zookeeper | 提供分布式協調一致性服務 |
| Kafka | 高吞吐量的分布式發布/訂閱消息系統 |
| Pig | 基于Hadoop的大數據分析平臺,提供類似sql的查詢語言Pig Latin。 |
| Flume | 日志采集框架 |
| Oozie | Hadoop上的作業流調度系統 |
| Spark | 分布式并行計算框架 |
| Sqoop | 數據傳輸框架,用于MySQL與HDFS之間的數據傳遞 |
| Storm | 流計算框架 |
5.配置文件中的參數
所有配置文件:
重點關注 hdfs-site.xml,core-site.xml
-
hdfs-site.xml
-
core-site.xml
hadoop.tmp.dir 是 hadoop文件系統依賴的基本配置,很多配置路徑都依賴它,它的默認位置是在/tmp/{$user}下面,注意這是個臨時目錄。因此,它的持久化配置很重要的, 如果選擇默認,一旦因為斷電等外在因素影響,/tmp/{$user}下的所有東西都會丟失。
?
第三章 HDFS
1.分布式文件系統結構
主從結構:分布式文件系統在物理上是由諸多計算機節點組成的,這里計算機節點分為兩類,一類叫主節點,一類叫從節點。
2.HDFS的目標
- 大數據集
- 流式數據讀寫
- 簡單的文件模型
- 強大的跨平臺兼容性
- 廉價的硬件設備
3.HDFS的局限性
-
不適合低延遲數據訪問(不適合實時處理,io開銷大)
-
無法高效存儲大量小文件(文件塊機制)
-
不支持多用戶并發寫入及任意修改文件(一個文件,同時只允許一個寫入者對文件進行追加)
4.塊 Block
塊是HDFS中文件存儲的基本單位,在Hadoop2.x中文件塊大小默認為128MB,在1.x中默認為64MB。
HDFS采用抽象的塊概念可以帶來以下幾個明顯的好處:
- 支持大規模文件存儲:文件以塊為單位進行存儲,一個大規模文件可以被分拆成若干個文件塊,不同的文件塊可以被分發到不同的節點上,因此,一個文件的大小不會受到單個節點的存儲容量的限制,可以遠遠大于網絡中任意節點的存儲容量
- 簡化系統設計(簡化了文件和元數據的管理):首先,大大簡化了存儲管理,因為文件塊大小是固定的,這樣就可以很容易計算出一個節點可以存儲多少文件塊;其次,方便了元數據的管理,元數據不需要和文件塊一起存儲,可以由其他系統負責管理元數據
- 適合數據備份:每個文件塊都可以冗余存儲到多個節點上,大大提高了系統的容錯性和可用性
5.HDFS體系結構
hdfs中采用了主-從結構模型,一個hdfs集群中包含1個namenode和若干個datanode。
- 名稱節點 namenode
- 數據節點 datanode
- 客戶端 client
6.NameNode 名稱節點
namenode節點是整個hdfs集群的唯一的主節點,負責:
- 接收和回復客戶的訪問請求
- 存儲文件系統的所有元數據(管理文件系統的命名空間)
名稱節點(NameNode)負責管理分布式文件系統的命名空間(Namespace),保存了兩個核心的數據結構,即 FsImage 和 EditLog。
-
FsImage
命名空間鏡像文件。FsImage 用于維護文件系統樹以及文件樹中所有的文件和目錄的元數據,即包含文件系統中所有目錄和文件inode的序列化形式。
-
EditLog
操作日志文件。EditLog 中記錄了所有針對文件的創建、刪除、重命名等操作。
?
啟動過程(處于安全模式)
在名稱節點啟動的時候,第一次啟動NameNode格式化后,創建Fsimage和Edits文件。如果不是第一次啟動,直接加載編輯日志和鏡像文件到內存。
它會將FsImage文件中的內容加載到內存中,之后再執行EditLog文件中的各項操作,使得內存中的元數據和實際的同步。
一旦在內存中成功建立文件系統元數據的映射,則創建一個新的FsImage文件和一個空的EditLog文件。
?
7.DataNode 數據節點
datanode節點是hdfs集群的工作節點,負責:
- 數據的存儲:存儲文件系統的數據文件,每個文件被分成多個數據塊存儲在不同節點上。
- 數據的讀寫:接收客戶端的讀寫請求
- 定期向namenode發送心跳信息,若沒有發送則被標記為宕機
- 在namenode的調度下進行對數據塊的操作
8.元數據
存儲的信息:hdfs中的元數據包含HDFS中文件的所有塊和塊的存儲位置、修改和訪問時間、訪問權限、大小等信息。
存儲的位置:元數據存儲在NameNode節點的FsImage數據結構中,由它負責管理。
9.HDFS工作機制(上面都有提到過)
-
NameNode與SecondaryNameNode
(1)NN的啟動過程
(2)采用SecondaryNameNode的原因
(3)SNN的工作機制
-
DataNode
存儲文件、注冊并接收與回復client讀寫請求、發送塊列表、發送心跳信息
10.通信協議(了解)
HDFS中有5種通信協議,各個節點之間根據不同協議通過RPC (Remote Procedure Call) 進行交互。
11.HDFS冗余數據存儲
HDFS對于同一個數據塊會存儲多個副本,默認為3個。且不同副本被分布到不同節點上。
保證:系統的容錯性和可用性
優點:加快數據傳輸速度、多個副本對比容易檢查數據錯誤、保證數據可靠性
13.HDFS數據存儲策略
假如一個數據塊有3個副本,
那么第1個副本會隨機存儲在一個機架上的某個節點;
第2個副本會存儲在與第1個副本相同機架的不同節點上;
第3個副本會存儲在與第1個副本不同機架的隨機節點上。
14.HDFS數據錯誤的三種類型
- NameNode數據錯誤
- DataNode數據錯誤
- 數據出錯
15.HDFS常用shell命令
# 啟動HDFS [ht@hadoop101 ~]$ start-dfs.sh# 停止HDFS [ht@hadoop101 ~]$ stop-dfs.sh# 輸出某個命令的幫助信息 [ht@hadoop101 ~]$ hadoop fs -help ls# 顯示目錄詳細信息,-p表示遞歸 [ht@hadoop101 ~]$ hadoop fs -ls [-R]# 在HDFS上創建目錄,-p表示遞歸創建 [ht@hadoop101 ~]$ hadoop fs -mkdir -p /user/ht# 顯示文件內容 [ht@hadoop101 myfile]$ hadoop fs -cat /user/ht/test.txt# 將HDFS上的文件拷貝到 HDFS的另一個目錄 # 從/user/ht/test.txt 拷貝到 /user/ht/file/ [ht@hadoop101 myfile]$ hadoop fs -cp /user/ht/test.txt /user/ht/file/# -copyFromLocal:從本地文件系統中拷貝文件到HDFS路徑去 # -copyToLocal:從HDFS拷貝到本地 # -put:等同于copyFromLocal # -get:等同于copyToLocal # -mv:在HDFS目錄中移動文件# -chgrp將文件所屬的用戶組改為ht,-R表示遞歸 # -chmod改變文件權限、-chown改變文件所屬用戶 也是一樣的 [ht@hadoop101 ~]$ hadoop fs -chgrp -R ht /user/ht/test.txt# 刪除文件或文件夾,-r表示遞歸 [ht@hadoop100 hadoop]$ hdfs dfs -rm [-r] /user/ht/wcoutput # -rmdir:刪除空目錄 # -du 統計目錄的大小信息第四章 HBase
1.起源
HBase是谷歌的BigTable的開源實現。
2.HBase和BigTable的底層技術對應關系
?
3.HBase與傳統關系型數據庫的對比
區別主要在于:
-
數據類型:hbase中所有數據都是字符串類型;關系型數據庫中具有多種數據類型。
-
數據操作:hbase只能對數據進行增、刪、查、清空等操作,不能進行表之間的連接;關系型數據庫可以增刪改查,還可以通過表的外鍵進行連接。
-
存儲模型:hbase基于列存儲,關系型數據庫基于行存儲。
-
數據維護:hbase對數據進行操作后會保留歷史版本。
-
數據索引:hbase只有一個索引——行鍵,關系型數據庫可以創建很多索引。
-
可伸縮性:hbase可以通過集群節點的擴展實現存儲數據量的水平擴展,關系型數據庫難以實現橫向擴展,縱向擴展的空間有限。
在hbase中:類型是未經解釋的字符串,只能對它進行增刪查等操作,索引就是它本身的行鍵,它就是按列存儲,對它操作后還會保留歷史版本,hbase還通過集群的機器增加和減少來實現存儲容量的增大和縮小。
4.HBase的物理視圖與概念視圖
?
5.Master 和 Region的功能
-
Master
master負責管理和維護HBase表的分區信息(Region列表),維護Region服務器列表,分配Region以確保負載均衡。
-
Region
region負責存儲hbase表的數據,處理來自客戶端的讀寫請求。
6.Region的定位(HBase的三層結構)
7.Region服務器工作原理
-
用戶讀寫數據過程
-
緩存刷新
-
StoreFile的合并
8.HLog工作原理
HLog是記錄Region中各項更新操作的日志,它持久化存儲在磁盤中。
用戶更新數據必須首先寫入HLog后,才能寫入MemStore緩存。
當Region啟動時,首先檢查HLog是否存在未合并的更新操作;若是則先執行更新操作,合并到MemStore和StoreFile中,然后生成一個新的空的HLog文件。
9.HBase性能優化方法(了解)
-
行鍵
行鍵是按照字典序存儲,因此,設計行鍵時,要充分利用這個排序特點,將經常一起讀取的數據存儲到一塊,將最近可能會被訪問的數據放在一塊。
舉個例子:如果最近寫入HBase表中的數據是最可能被訪問的,可以考慮將時間戳作為行鍵的一部分,由于是字典序排序,所以可以使用Long.MAX_VALUE - timestamp作為行鍵,這樣能保證新寫入的數據在讀取時可以被快速命中。
-
InMemory
創建表的時候,可以通過HColumnDescriptor.setInMemory(true)將表放到Region服務器的緩存中,保證在讀取的時候被cache命中。
-
Max Version
創建表的時候,可以通過HColumnDescriptor.setMaxVersions(int maxVersions)設置表中數據的最大版本,如果只需要保存最新版本的數據,那么可以設置setMaxVersions(1)。
-
Time To Live
創建表的時候,可以通過HColumnDescriptor.setTimeToLive(int timeToLive)設置表中數據的存儲生命期,過期數據將自動被刪除,例如如果只需要存儲最近兩天的數據,那么可以設置setTimeToLive(2 * 24 * 60 * 60)。
10.HBase常用shell命令
# 啟動hbase shell hadoop@ubuntu:~$ hbase shell# 創建表t:列族為f,列族版本號為5 hbase> create 't1',{NAME => 'f1',VERSIONS => 5}# 創建表t:列族為f1、f2、f3,兩種方式等價 hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'} hbase> create 't1', 'f1', 'f2', 'f3'# 創建表t:將表根據分割算法HexStringSplit 分布在15個Region里 hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}# 創建表t:指定Region的切分點 hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']-------------------------------------------------------------------------------------------------------- # help 查看create命令的幫助信息 hbase(main):002:0> help "create" Creates a table. Pass a table name, and a set of column family # create命令的描述 specifications (at least one), and, optionally, table configuration. Column specification can be a simple string (name), or a dictionary (dictionaries are described below in main help output), necessarily including NAME attribute. Examples:Create a table with namespace=ns1 and table qualifier=t1 #指定namespace與hbase> create 'ns1:t1', {NAME => 'f1', VERSIONS => 5}Create a table with namespace=default and table qualifier=t1hbase> create 't1', {NAME => 'f1'}, {NAME => 'f2'}, {NAME => 'f3'}hbase> # The above in shorthand would be the following:hbase> create 't1', 'f1', 'f2', 'f3'hbase> create 't1', {NAME => 'f1', VERSIONS => 1, TTL => 2592000, BLOCKCACHE => true}hbase> create 't1', {NAME => 'f1', CONFIGURATION => {'hbase.hstore.blockingStoreFiles' => '10'}}hbase> create 't1', {NAME => 'f1', IS_MOB => true, MOB_THRESHOLD => 1000000, MOB_COMPACT_PARTITION_POLICY => 'weekly'}Table configuration options can be put at the end. Examples:hbase> create 'ns1:t1', 'f1', SPLITS => ['10', '20', '30', '40']hbase> create 't1', 'f1', SPLITS => ['10', '20', '30', '40']hbase> create 't1', 'f1', SPLITS_FILE => 'splits.txt', OWNER => 'johndoe'hbase> create 't1', {NAME => 'f1', VERSIONS => 5}, METADATA => { 'mykey' => 'myvalue' }hbase> # Optionally pre-split the table into NUMREGIONS, usinghbase> # SPLITALGO ("HexStringSplit", "UniformSplit" or classname)hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit'}hbase> create 't1', 'f1', {NUMREGIONS => 15, SPLITALGO => 'HexStringSplit', REGION_REPLICATION => 2, CONFIGURATION => {'hbase.hregion.scan.loadColumnFamiliesOnDemand' => 'true'}}hbase> create 't1', 'f1', {SPLIT_ENABLED => false, MERGE_ENABLED => false}hbase> create 't1', {NAME => 'f1', DFS_REPLICATION => 1}You can also keep around a reference to the created table:hbase> t1 = create 't1', 'f1'Which gives you a reference to the table named 't1', on which you can then call methods. -------------------------------------------------------------------------------------------------------# list 列出所有表 hbase> list# put 向表中指定的單元格添加數據 hbase> put 't1','row1','f1:c1',120000 # 通過表,行鍵,列族:列限定符進行定位,值為120000# get 通過指定坐標來獲取單元格的值 hbase(main):005:0> get 't1','row1','f1:c1' COLUMN CELL f1:c1 timestamp=1609810077099, value=120000 1 row(s) Took 0.0722 seconds # delete 刪除表中指定單元格的數據 hbase(main):021:0> delete 't1','row1','f1:c1',timestamp=1609810077099# scan 瀏覽表的信息 hbase(main):004:0> scan 't1' # 這時會顯示表t1中的所有行# scan 瀏覽某個單元格的數據 hbase(main):010:0> scan 't1',{COLUMNS => 'f1:c1'}# alter 修改列族模式 hbase(main):011:0> alter 't1',NAME => 'f2' # 向表t1中增加列族f2 hbase(main):014:0> alter 't1',NAME => 'f2',METHOD => 'delete' # 將表t1中的列族f2刪除# count 統計表中的行數 hbase(main):015:0> count 't1' # 統計t1的行數# describe 顯示表的相關信息 hbase(main):017:0> describe 't1' Table t1 is ENABLED t1 COLUMN FAMILIES DESCRIPTION {NAME => 'f1', VERSIONS => '5', EVICT_BLOCKS_ON_CLOSE => 'false', NEW_VERSION_BEHAVIOR => 'false', KEEP_DELETE D_CELLS => 'FALSE', CACHE_DATA_ON_WRITE => 'false', DATA_BLOCK_ENCODING => 'NONE', TTL => 'FOREVER', MIN_VERSI ONS => '0', REPLICATION_SCOPE => '0', BLOOMFILTER => 'ROW', CACHE_INDEX_ON_WRITE => 'false', IN_MEMORY => 'fal se', CACHE_BLOOMS_ON_WRITE => 'false', PREFETCH_BLOCKS_ON_OPEN => 'false', COMPRESSION => 'NONE', BLOCKCACHE = > 'true', BLOCKSIZE => '65536'} 1 row(s) QUOTAS 0 row(s) Took 0.1104 seconds# enable/disable 使表有效或無效 hbase(main):015:0> disable 't1'# drop 刪除表,這里需要注意刪除表之前要先使用disable使這個表無效,這也是為了防止誤刪 hbase(main):023:0> disable 't1' Took 0.8378 seconds hbase(main):024:0> drop 't1' Took 0.4997 seconds# exists 判斷某個表是否存在 hbase(main):025:0> exists 't1' Table t1 does not exist Took 0.0231 seconds => false# truncate 使表無效并清空該表的數據 hbase(main):029:0> truncate 'teacher' Truncating 'teacher' table (it may take a while): Disabling table... Truncating table... Took 2.1127 secondshbase(main):031:0> exists 'teacher' # truncate后查看該表是否存在 Table teacher does exist Took 0.0156 seconds => true # 還存在# 查看HBase集群狀態 hbase(main):026:0> status 1 active master, 0 backup masters, 1 servers, 0 dead, 5.0000 average load Took 0.0582 seconds # 退出hbase shell hbase> exit第五章 NoSQL
1.nosql 的含義
2.nosql 興起的原因
- 關系數據庫已經無法滿足Web2.0的需求
(1)無法滿足海量數據的管理需求
(2)無法滿足數據高并發的需求
(3)無法滿足高可擴展性和高可用性的需求
- 關系數據庫的關鍵特性包括完善的事務機制和高效的查詢機制,到了Web2.0時代卻成了雞肋
(1)Web2.0網站系統通常不要求嚴格的數據庫事務
(2)Web2.0并不要求嚴格的讀寫實時性
(3)Web2.0通常不包含大量復雜的SQL查詢(去結構化,存儲空間換取更好的查詢性能)
3.nosql與關系型數據庫的比較
| 數據庫原理 | 完全支持 | 部分支持 | RDBMS有關系代數理論作為基礎;NoSQL沒有統一的理論基礎 |
| 一致性 | 強一致性 | 弱一致性 | RDBMS嚴格遵守事務ACID模型,可以保證事務強一致性;很多NoSQL數據庫放松了對事務ACID四性的要求,而是遵守BASE模型,只能保證最終一致性 |
| 數據庫模式 | 固定 | 靈活 | RDBMS需要定義數據庫模式,嚴格遵守數據定義和相關約束條件;NoSQL不存在數據庫模式,可以自由靈活定義并存儲各種不同類型的數據 |
| 數據完整性 | 容易實現 | 很難實現 | 任何一個RDBMS都可以很容易實現數據完整性,比如通過主鍵或者非空約束來實現實體完整性,通過主鍵、外鍵來實現參照完整性,通過約束或者觸發器來實現用戶自定義完整性;但是,在NoSQL數據庫卻無法實現 |
| 數據規模 | 大 | 超大 | RDBMS很難實現橫向擴展,縱向擴展的空間也比較有限,性能會隨著數據規模的增大而降低;NoSQL可以很容易通過添加更多設備來支持更大規模的數據 |
| 擴展性 | 一般 | 好 | RDBMS很難實現橫向擴展,縱向擴展的空間也比較有限;NoSQL在設計之初就充分考慮了橫向擴展的需求,可以很容易通過添加廉價設備實現擴展 |
| 可用性 | 好 | 很好 | RDBMS在任何時候都以保證數據一致性為優先目標,其次才是優化系統性能,隨著數據規模的增大,RDBMS為了保證嚴格的一致性,只能提供相對較弱的可用性;大多數NoSQL都能提供較高的可用性 |
| 查詢效率 | 快 | 可以實現高效的簡單查詢,但是不具備高度結構化查詢等特性,復雜查詢的性能不盡人意 | RDBMS借助于索引機制可以實現快速查詢(包括記錄查詢和范圍查詢);很多NoSQL數據庫沒有面向復雜查詢的索引,雖然NoSQL可以使用MapReduce來加速查詢,但是,在復雜查詢方面的性能仍然不如RDBMS |
| 標準化 | 是 | 否 | RDBMS已經標準化(SQL);NoSQL還沒有行業標準,不同的NoSQL數據庫都有自己的查詢語言,很難規范應用程序接口 StoneBraker認為:NoSQL缺乏統一查詢語言,將會拖慢NoSQL發展 |
| 技術支持 | 高 | 低 | RDBMS經過幾十年的發展,已經非常成熟,Oracle等大型廠商都可以提供很好的技術支持;NoSQL在技術支持方面仍然處于起步階段,還不成熟,缺乏有力的技術支持。 |
| 可維護性 | 復雜 | 復雜 | RDBMS需要專門的數據庫管理員(DBA)維護;NoSQL數據庫雖然沒有DBMS復雜,也難以維護。 |
總結
(1)關系數據庫
優勢:以完善的關系代數理論作為基礎,有嚴格的標準,支持事務ACID四性,借助索引機制可以實現高效的查詢,技術成熟,有專業公司的技術支持
劣勢:可擴展性較差,無法較好支持海量數據存儲,數據模型過于死板、無法較好支持Web2.0應用,事務機制影響了系統的整體性能等
(2)NoSQL數據庫
優勢:可以支持超大規模數據存儲,靈活的數據模型可以很好地支持Web2.0應用,具有強大的橫向擴展能力等
劣勢:缺乏數學理論基礎,復雜查詢性能不高,大都不能實現事務強一致性,很難實現數據完整性,技術尚不成熟,缺乏專業團隊的技術支持,維護較困難等
4.nosql的4大類型、各自的典型應用
典型的NoSQL數據庫通常包括鍵值數據庫、列族數據庫、文檔數據庫和圖形數據庫。
各類型的產品:
- 鍵值數據庫
| 數據模型 | 鍵/值對 鍵是一個字符串對象 值可以是任意類型的數據,比如整型、字符型、數組、列表、集合等 |
| 典型應用 | 涉及頻繁讀寫、擁有簡單數據模型的應用 內容緩存,比如會話、配置文件、參數、購物車等 存儲配置和用戶數據信息的移動應用 |
| 優點 | 擴展性好,靈活性好,大量寫操作時性能高 |
| 缺點 | 無法存儲結構化信息,條件查詢效率較低 |
| 不適用情形 | 不是通過鍵而是通過值來查:鍵值數據庫根本沒有通過值查詢的途徑 需要存儲數據之間的關系:在鍵值數據庫中,不能通過兩個或兩個以上的鍵來關聯數據 需要事務的支持:在一些鍵值數據庫中,產生故障時,不可以回滾 |
| 使用者 | 百度云數據庫(Redis)、GitHub(Riak)、BestBuy(Riak)、Twitter(Redis和Memcached)、StackOverFlow(Redis)、Instagram (Redis)、Youtube(Memcached)、Wikipedia(Memcached) |
-
列族數據庫
相關產品BigTable、HBase、Cassandra、HadoopDB、GreenPlum、PNUTS 數據模型 列族 典型應用 分布式數據存儲與管理 數據在地理上分布于多個數據中心的應用程序 可以容忍副本中存在短期不一致情況的應用程序 擁有動態字段的應用程序 擁有潛在大量數據的應用程序,大到幾百TB的數據 優點 查找速度快,可擴展性強,容易進行分布式擴展,復雜性低 缺點 功能較少,大都不支持強事務一致性 不適用情形 需要ACID事務支持的情形,Cassandra等產品就不適用 使用者 Ebay(Cassandra)、Instagram(Cassandra)、NASA(Cassandra)、Twitter(Cassandra and HBase)、Facebook(HBase)、Yahoo!(HBase) -
文檔數據庫
相關產品MongoDB、CouchDB、Terrastore、ThruDB、RavenDB、SisoDB、RaptorDB、CloudKit、Perservere、Jackrabbit 數據模型 鍵/值 值(value)是版本化的文檔 典型應用 存儲、索引并管理面向文檔的數據或者類似的半結構化數據 比如,用于后臺具有大量讀寫操作的網站、使用JSON數據結構的應用、使用嵌套結構等非規范化數據的應用程序 優點 性能好(高并發),靈活性高,復雜性低,數據結構靈活 提供嵌入式文檔功能,將經常查詢的數據存儲在同一個文檔中 既可以根據鍵來構建索引,也可以根據內容構建索引 缺點 缺乏統一的查詢語法 不適用情形 在不同的文檔上添加事務。文檔數據庫并不支持文檔間的事務,如果對這方面有需求則不應該選用這個解決方案 使用者 百度云數據庫(MongoDB)、SAP (MongoDB)、Codecademy (MongoDB)、Foursquare (MongoDB)、NBC News (RavenDB) -
圖形數據庫
相關產品Neo4J、OrientDB、InfoGrid、Infinite Graph、GraphDB 數據模型 圖結構 典型應用 專門用于處理具有高度相互關聯關系的數據,比較適合于社交網絡、模式識別、依賴分析、推薦系統以及路徑尋找等問題 優點 靈活性高,支持復雜的圖形算法,可用于構建復雜的關系圖譜 缺點 復雜性高,只能支持一定的數據規模 使用者 Adobe(Neo4J)、Cisco(Neo4J)、T-Mobile(Neo4J)
5.nosql 的三大基石
-
CAP
所謂的CAP指的是:
C(Consistency):一致性,是指任何一個讀操作總是能夠讀到之前完成的寫操作的結果,也就是在分布式環境中,多點的數據是一致的,或者說,所有節點在同一時間具有相同的數據
A:(Availability):可用性,是指快速獲取數據,可以在確定的時間內返回操作結果,保證每個請求不管成功或者失敗都有響應;
P(Tolerance of Network Partition):分區容忍性,是指當出現網絡分區的情況時(即系統中的一部分節點無法和其他節點進行通信),分離的系統也能夠正常運行,也就是說,系統中任意信息的丟失或失敗不會影響系統的繼續運作。
CAP理論告訴我們,一個分布式系統不可能同時滿足一致性、可用性和分區容忍性這三個需求,最多只能同時滿足其中兩個,正所謂“魚和熊掌不可兼得”。
-
BASE
說起BASE(Basically Availble, Soft-state, Eventual consistency),不得不談到ACID。
一個數據庫事務具有ACID四性:
A(Atomicity):原子性,是指事務必須是原子工作單元,對于其數據修改,要么全都執行,要么全都不執行
C(Consistency):一致性,是指事務在完成時,必須使所有的數據都保持一致狀態
I(Isolation):隔離性,是指由并發事務所做的修改必須與任何其它并發事務所做的修改隔離
D(Durability):持久性,是指事務完成之后,它對于系統的影響是永久性的,該修改即使出現致命的系統故障也將一直保持BASE的基本含義是基本可用(Basically Availble)、軟狀態(Soft-state)和最終一致性(Eventual consistency):
1.基本可用:基本可用,是指一個分布式系統的一部分發生問題變得不可用時,其他部分仍然可以正常使用,也就是允許分區失敗的情形出現。
2.軟狀態:“軟狀態(soft-state)”是與“硬狀態(hard-state)”相對應的一種提法。數據庫保存的數據是“硬狀態”時,可以保證數據一致性,即保證數據一直是正確的。“軟狀態”是指狀態可以有一段時間不同步,具有一定的滯后性。
3.最終一致性:一致性的類型包括強一致性和弱一致性,二者的主要**區別在于高并發的數據訪問操作下,后續操作是否能夠獲取最新的數據。**對于強一致性而言,當執行完一次更新操作后,后續的其他讀操作就可以保證讀到更新后的最新數據;反之,如果不能保證后續訪問讀到的都是更新后的最新數據,那么就是弱一致性。而最終一致性只不過是弱一致性的一種特例,允許后續的訪問操作可以暫時讀不到更新后的數據,但是經過一段時間之后,必須最終讀到更新后的數據。
最常見的實現最終一致性的系統是DNS(域名系統)。一個域名更新操作根據配置的形式被分發出去,并結合有過期機制的緩存;最終所有的客戶端可以看到最新的值。 -
最終一致性
最終一致性根據更新數據后各進程訪問到數據的時間和方式的不同,又可以區分為:
因果一致性:如果進程A通知進程B它已更新了一個數據項,那么進程B的后續訪問將獲得A寫入的最新值。而與進程A無因果關系的進程C的訪問,仍然遵守一般的最終一致性規則
“讀己之所寫”一致性:可以視為因果一致性的一個特例。當進程A自己執行一個更新操作之后,它自己總是可以訪問到更新過的值,絕不會看到舊值
單調讀一致性:如果進程已經看到過數據對象的某個值,那么任何后續訪問都不會返回在那個值之前的值會話一致性:它把訪問存儲系統的進程放到會話(session)的上下文中,只要會話還存在,系統就保證“讀己之所寫”一致性。如果由于某些失敗情形令會話終止,就要建立新的會話,而且系統保證不會延續到新的會話
單調寫一致性:系統保證來自同一個進程的寫操作順序執行。系統必須保證這種程度的一致性,否則就非常難以編程了
擴展知識
當處理CAP的問題時,可以有幾個明顯的選擇:
1.CA:也就是強調一致性(C)和可用性(A),放棄分區容忍性(P),最簡單的做法是把所有與事務相關的內容都放到同一臺機器上。很顯然,這種做法會嚴重影響系統的可擴展性。傳統的關系數據庫(MySQL、SQL Server和PostgreSQL),都采用了這種設計原則,因此,擴展性都比較差
2.CP:也就是強調一致性(C)和分區容忍性(P),放棄可用性(A),當出現網絡分區的情況時,受影響的服務需要等待數據一致,因此在等待期間就無法對外提供服務
3.AP:也就是強調可用性(A)和分區容忍性(P),放棄一致性(C),允許系統返回不一致的數據
6.MongoDB基本概念
在mongodb中基本的概念是文檔、集合、數據庫
| database | database | 數據庫 |
| table | collection | 數據庫表/集合 |
| row | document | 數據記錄行/文檔 |
| column | field | 數據字段/域 |
| index | index | 索引 |
| table joins | 表連接,MongoDB不支持 | |
| primary key | primary key | 主鍵,MongoDB自動將_id字段設置為主鍵 |
第六章 云數據庫
1.云數據庫的概念
云數據庫是部署和虛擬化在云計算環境中的數據庫。
云數據庫是在云計算的大背景下發展起來的一種新興的共享基礎架構的方法,它極大地增強了數據庫的存儲能力,消除了人員、硬件、軟件的重復配置,讓軟、硬件升級變得更加容易。
云數據庫具有高可擴展性、高可用性、采用多租形式和支持資源有效分發等特點。
2.云數據庫的特性
(1)動態可擴展:用戶按需擴展
(2)高可用性:云數據庫具有故障自動單點切換、數據庫自動備份等功能
(3)較低的使用代價:RDS支付的費用遠低于自建數據庫所需的成本
(4)易用性:提供WEB界面進行配置、操作數據庫實例
(5)高性能
(6)免維護:有專門的維護人員
(7)安全
3.云數據庫廠商以及各自的產品
| Amazon | Dynamo、SimpleDB、RDS |
| Google Cloud SQL | |
| Microsoft | Microsoft SQL Azure |
| Oracle | Oracle Cloud |
| Yahoo! | PNUTS |
| Vertica | Analytic Database v3.0 for the Cloud |
| EnerpriseDB | Postgres Plus in the Cloud |
| 阿里 | 阿里云RDS |
| 百度 | 百度云數據庫 |
| 騰訊 | 騰訊云數據庫 |
第七章 MapReduce
1.MapReduce與傳統并行計算框架比較
| 集群架構/容錯性 | 共享式(共享內存/共享存儲),容錯性差 | 非共享式,容錯性好 |
| 硬件/價格/擴展性 | 刀片服務器、高速網、SAN,價格貴,擴展性差 | 普通PC機,便宜,擴展性好 |
| 編程/學習難度 | what-how,難 | what,簡單 |
| 適用場景 | 實時、細粒度計算、計算密集型 | 非實時、批處理、數據密集型 |
2.MapReduce的2個特點
分而治之、計算向數據靠攏
3.MapReduce流程
4.MapReduce的體系結構
下面是Hadoop1.x中的體系結構,但我覺得不會考:
MapReduce體系結構主要由四個部分組成,分別是:Client、JobTracker、TaskTracker以及Task。
1)Client
用戶編寫的MapReduce程序通過Client提交到JobTracker端
用戶可通過Client提供的一些接口查看作業運行狀態
2)JobTracker
JobTracker負責資源監控和作業調度
JobTracker 監控所有TaskTracker與Job的健康狀況,一旦發現失敗,就將相應的任務轉移到其他節點
JobTracker 會跟蹤任務的執行進度、資源使用量等信息,并將這些信息告訴任務調度器(TaskScheduler),而調度器會在資源出現空閑時,選擇合適的任務去使用這些資源
3)TaskTracker
TaskTracker 會周期性地通過“心跳”將本節點上資源的使用情況和任務的運行進度匯報給JobTracker,同時接收JobTracker 發送過來的命令并執行相應的操作(如啟動新任務、殺死任務等)
TaskTracker 使用“slot”等量劃分本節點上的資源量(CPU、內存等)。一個Task 獲取到一個slot 后才有機會運行,而Hadoop調度器的作用就是將各個TaskTracker上的空閑slot分配給Task使用。slot 分為Map slot 和Reduce slot 兩種,分別供MapTask 和Reduce Task 使用
4)Task
Task 分為Map Task 和Reduce Task 兩種,均由TaskTracker 啟動
5.map與reduce并行度的決定因素
maptask并行度由輸入數據分片數量決定;reducetask并行度由輸入數據分區數量決定。
6.WordCount代碼
package com.ht.wordcount;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import java.io.IOException;public class WordCount {public static class WordCountMapper extends Mapper<LongWritable,Text,Text, IntWritable>{IntWritable intWritable = new IntWritable(1);@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {// 1.讀取數據String line = value.toString();// 2.切片String[] splits = line.split("\t");// 3.輸出Text text = new Text();for (String split : splits) {text.set(split);context.write(text, intWritable);}}}public static class WordCountReducer extends Reducer<Text,IntWritable,Text,IntWritable>{@Overrideprotected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {// 1.讀取數據 <k,list<v1,v2,...,vn>>int sumVal = 0;for (IntWritable val:values){sumVal += val.get();}// 2.輸出數據context.write(key,new IntWritable(sumVal));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {// 1.hadoop運行信息Configuration configuration = new Configuration();// 2.獲取hadoop實例String jobName = "WordCount";Job job = Job.getInstance(configuration, jobName);// 3.設置程序的本地jar包job.setJarByClass(WordCount.class);// 4.關聯mapper和reducerjob.setMapperClass(WordCountMapper.class);job.setReducerClass(WordCountReducer.class);// 5.設置mapper的輸出kvjob.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(IntWritable.class);// 6.設置reducer的輸出kv(最終輸出)job.setOutputKeyClass(Text.class);job.setOutputValueClass(IntWritable.class);// 7.設置作業輸入輸出路徑Path inputPath = new Path("D:\\Document\\temp\\wordcount\\input.txt");Path outputPath = new Path("D:\\Document\\temp\\wordcount\\output");// 獲取hdfs文件系統實例FileSystem fileSystem = FileSystem.get(configuration);if(fileSystem.exists(outputPath)){fileSystem.delete(outputPath,true);}// 8.設置輸入輸出格式FileInputFormat.addInputPath(job,inputPath);FileOutputFormat.setOutputPath(job, outputPath);// 9.查看作業運行情況System.out.println("job " + jobName + "is running...");// 若成功打印1,不成功打印0System.out.println(job.waitForCompletion(true) ? 1:0);} }第八章 Hadoop 2.x
1.Hadoop1.0的不足、改進(了解)
Hadoop1.0的核心組件(僅指MapReduce和HDFS,不包括Hadoop生態系統內的Pig、Hive、HBase等其他組件),主要存在以下不足:
- 抽象層次低,需人工編碼
- 表達能力有限
- 開發者自己管理作業(Job)之間的依賴關系
- 難以看到程序整體邏輯
- 執行迭代操作效率低
- 資源浪費(Map和Reduce分兩階段執行)
- 實時性差(適合批處理,不支持實時交互式)
| HDFS | 單一名稱節點,存在單點故障問題 | 設計了HDFS HA,提供名稱節點熱備機制 |
| HDFS | 單一命名空間,無法實現資源隔離 | 設計了HDFS Federation,管理多個命名空間 |
| MapReduce | 資源管理效率低 | 設計了新的資源管理框架YARN |
2.HA的工作原理
HDFS HA(High Availability)是為了解決單點故障問題。HA集群設置兩個名稱節點,“活躍(Active)”和“待命(Standby)”,兩種名稱節點的狀態同步,可以借助于一個共享存儲系統來實現。
一旦活躍名稱節點出現故障,就可以立即切換到待命名稱節點,Zookeeper確保一個名稱節點在對外服務。名稱節點維護映射信息,數據節點同時向兩個名稱節點匯報信息。
?
3.YARN設計思路
到了Hadoop2.0以后,MapReduce1.0中的資源管理調度功能,被單獨分離出來形成了YARN,它是一個純粹的資源管理調度框架,而不是一個計算框架。
?
4.YARN的發展目標
一個企業當中同時存在各種不同的業務應用場景,需要采用不同的計算框架
MapReduce實現離線批處理
使用Impala實現實時交互式查詢分析
使用Storm實現流式數據實時分析
使用Spark實現迭代計算
這些產品通常來自不同的開發團隊,具有各自的資源調度管理機制,為了避免不同類型應用之間互相干擾,企業就需要把內部的服務器拆分成多個集群,分別安裝運行不同的計算框架,即“一個框架一個集群”
導致問題:集群資源利用率低、數據無法共享、維護代價高
YARN的目標就是實現“一個集群多個框架”,即在一個集群上部署一個統一的資源調度管理框架YARN,在YARN之上可以部署其他各種計算框架。
由YARN為這些計算框架提供統一的資源調度管理服務,并且能夠根據各種計算框架的負載需求,調整各自占用的資源,實現集群資源共享和資源彈性收縮。
可以實現一個集群上的不同應用負載混搭,有效提高了集群的利用率;不同計算框架可以共享底層存儲,避免了數據集跨集群移動。
?
第九章 Spark
1.Spark的特點
- 運行速度快(相較于Hadoop)
- 通用性(具有完整的技術棧)
- 易用性(多種方式使用)
- 運行模式多樣
2.Spark支持的語言
scala、java、python、r
3.scala的特點
- 函數式編程,具備強大的并發性,更好地支持分布式系統
- 兼容java
- 語法簡潔優雅
- 支持高效的交互式編程
- 面向對象
- scala是spark的開發語言
4.Spark與Hadoop的比較
| 表達能力有限 | Spark的計算模式也屬于MapReduce,但不局限于Map和Reduce操作,還提供了多種數據集操作類型,編程模型比Hadoop MapReduce更靈活 |
| 磁盤I/O開銷大 | Spark提供了內存計算,可將中間結果放到內存中,對于迭代運算效率更高 |
| 延遲高 | Spark基于DAG的任務調度執行機制,要優于Hadoop MapReduce的迭代執行機制 |
5.Spark設計理念
一個技術棧滿足不同應用場景。
6.Spark的組件、組件的應用場景、時間跨度
| 復雜的批量數據處理 | 小時級 | MapReduce、Hive | Spark Core |
| 基于歷史數據的交互式查詢 | 分鐘級、秒級 | Impala、Dremel、Drill | Spark SQL |
| 基于實時數據流的數據處理 | 毫秒、秒級 | Storm、S4 | Spark Streaming、Structured Streaming |
| 基于歷史數據的數據挖掘 | - | Mahout | MLlib |
| 圖結構數據的處理 | - | Pregel、Hama | GraphX |
7.RDD基本概念
RDD是彈性分布式數據集,一種基于內存的數據共享模型。
8.Spark程序的運行流程
用戶提交代碼生成一個Job — sparkcontext向集群資源管理器注冊并申請資源 — 集群資源管理器分配Executor資源給這個Job — Executor向sparkcontext申請任務 — sparkcontext分發任務 — Executor執行完成,返回給sparkcontext
?
9.RDD的兩種算子
transformation 轉換算子、action行動算子
10.血緣關系
多個RDD之間一系列的依賴關系稱為血緣關系。
11.RDD的特性
1.A list of partitions 可分區
RDD是一個由多個partition(某個節點里的某一片連續的數據)組成的的list;將數據加載為RDD時,一般會遵循數據的本地性(一般一個hdfs里的block會加載為一個partition)。
2.A function for computing each split 分區計算
一個函數計算每一個分片,RDD的每個partition上面都會有function,也就是函數應用,其作用是實現RDD之間partition的轉換。
3.A list of dependencies on other RDDs 依賴關系
RDD會記錄它的依賴 ,依賴還具體分為寬依賴和窄依賴,但并不是所有的RDD都有依賴。為了容錯(重算,cache,checkpoint),也就是說在內存中的RDD操作時出錯或丟失會進行重算。
4.Optionally,a Partitioner for Key-value RDDs 自定義分區
可選項,如果RDD里面存的數據是key-value形式,則可以傳遞一個自定義的Partitioner進行重新分區,例如這里自定義的Partitioner是基于key進行分區,那則會將不同RDD里面的相同key的數據放到同一個partition里面
5.Optionally, a list of preferred locations to compute each split on 數據的本地性
最優的位置去計算,也就是數據的本地性。
12.RDD的依賴關系
兩個相鄰RDD之間的關系。有兩種,分為“窄依賴”和“寬依賴”。經過Shuffle過程的稱為寬依賴。
13.stage的劃分
如果有shuffle過程即寬依賴,那么就會創建一個新的stage。
14.Spark的三種部署方式
spark獨立部署、On YARN、On Meros
15.Spark編程
-
SparkContext:程序運行的上下文環境
-
SparkSession:用于創建會話,其實是封裝了 SQLContext 和 HiveContext
-
sparksql提供了DataFrame\DataSet,Spark SQL執行計劃生成和優化都由Catalyst(函數式關系查詢優化框架)負責
-
df與rdd的區別:
RDD是分布式的 Java對象的集合,但是,對象內部結構對于RDD而言卻是不可知的;
DataFrame是一種以RDD為基礎的分布式數據集,提供了詳細的結構信息。 -
df的創建、隱式轉換
DataFrame可以從文件中讀取并創建、還可以由RDD轉換得到。SparkSession.implicits $是Scala中的隱式方法,用于將常見的Scala對象轉換為DataFrames。RDD對象可以通過隱式轉換轉為DataFrame。
-
rdd轉換為df的2種方式
利用反射機制推斷RDD模式、利用編程方式定義RDD模式
-
WordCount
1.RDD
package Com.HT.Finalimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {val sparkConf = new SparkConf().setMaster("local[2]").setAppName("spark")val sparkContext = new SparkContext(sparkConf)// 步驟:讀取文件,分割,map,reduceByKeyval rdd = sparkContext.textFile("D:\\Document\\temp\\wordcount\\input.txt") // 讀取文件// 方法1:不簡化//val rdd1 = rdd.flatMap(line => line.split("\t")).map(word => (word, 1)).reduceByKey((a, b) => a + b)// 方法2:簡化 (scala的至簡原則)val rdd2 = rdd.flatMap(_.split("\t")).map((_, 1)).reduceByKey(_+_) rdd2.collect().foreach(println)sparkContext.stop()} }2.Spark SQL
package Com.HT.Final.wordcountimport org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.{SparkConf, SparkContext}object WordCount_sparksql {def main(args: Array[String]): Unit = {//1.創建Sparksession,獲取SparkContextval sparkConf: SparkConf = new SparkConf().setAppName("WordCount").setMaster("local[*]")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext: SparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")import spark.implicits._ //DS和DF的底層都是RDD,下面的計算過程中底層涉及到他們的相互轉換,所以需要導入隱式轉換//2.讀取文件,讀取為Dataset[String]val fileDS: Dataset[String] = spark.read.textFile("D:\\Do,cument\\temp\\wordcount\\input.txt")//3.對文件數據進行處理 -> Dataset[String] val wordDS: Dataset[String] = fileDS.flatMap(line => line.split("\t")) // 分割符\t//4.注冊表wordDS.createOrReplaceTempView("word_count")//5.書寫sql語句val sql:String = "select value as word,count(*) as counts from word_count group by word order by counts desc"//6.執行sql語句,查看內容val dataFrame: DataFrame = spark.sql(sql)dataFrame.show()//7.關閉資源sparkContext.stop()spark.stop()} }3.Spark Streaming
package Com.HT.Final.wordcount import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.{SparkConf}object WordCount_sparkstreaming {def main(args: Array[String]): Unit = {//創建一個sparkconf對象,其中local[2]表示任務運行在本地且需要兩個CUPval sparkconf = new SparkConf().setMaster("local[2]").setAppName("FileWordCount") //這里必須至少有2個線程,一個用于接收數據,一個用于統計//創建StreamingContext對象,rdd批次處理間隔設為5秒val ssc = new StreamingContext(sparkconf,Seconds(5))// 方法1:從hdfs中讀取文件,生成DStreamval lines = ssc.textFileStream("D:\\Document\\temp\\wordcount\\input.txt") // 必須用流的形式寫入到這個目錄形成文件才能被監測到// 方法2:通過Socket端口監聽并接收數據,設置主機名、端口、持久化存儲級別(如果數據在內存中放不下,則溢寫到磁盤) // val lines = ssc.socketTextStream("localhost", 9999, StorageLevel.MEMORY_AND_DISK) val res = lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) //用空格分割單詞并計數res.print() //顯示結果//啟動spark streamingssc.start()//等待直到任務停止ssc.awaitTermination()ssc.stop()} }4.Structured Streaming
package Com.HT.Final.wordcountimport org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}object WordCount_structuredstreaming {def main(args: Array[String]): Unit = { //1.創建SparkSessionval spark = SparkSession.builder().master("local[*]").appName("structuredstreaming").getOrCreate()spark.sparkContext.setLogLevel("WARN") // 設置日志級別import spark.implicits._ // 導入隱式轉換//2.數據集的生成,數據讀取val source: DataFrame = spark.readStream.format("socket") // 設置socket讀取流數據.option("host","localhost") // 監聽主機的ip地址或主機名.option("port",9999) // 指定監聽主機的端口.load()// 3.數據的處理:行轉換成一個個單詞// 方法1:Dataset[String] -> Dataset[(String, Int)] -> KeyValueGroupedDataset[String, (String, Int)] -> Dataset[(String, Long)]// groupByKey :按Key進行分組,返回[K,Iterable[V]] // val words: Dataset[(String, Long)] = source.as[String].flatMap(_.split(" ")).map((_,1)).groupByKey(_._1).count()// 方法2:Dataset[String] -> RelationalGroupedDataset -> DataFrame// groupBy:新建一個RelationalGroupedDataset,而這個方法提供count(),max(),agg()等方法。// groupByKey 后返回的類是 KeyValueGroupedDataset ,它里面所提供的操作接口不如 groupBy 返回的 RelationalGroupedDataset 所提供的接口豐富。val words: DataFrame = source.as[String].flatMap(_.split(" ")).groupBy("value").count()//4.結果集的生成輸出words.writeStream.outputMode(OutputMode.Complete()).format("console") // 設置在控制臺顯示結果.start() // 開啟.awaitTermination() // 等待直到任務停止} } -
案例1:求TOP值
package Com.HT.Finalimport org.apache.spark.{SparkConf, SparkContext}object TopN {def main(args: Array[String]): Unit = {// 設置環境val sparkConf = new SparkConf().setMaster("local").setAppName("TopN")val sparkContext = new SparkContext(sparkConf)// 讀取文件val rdd = sparkContext.textFile("D:\\Document\\temp\\rddfile\\TopN\\input.txt")// 過濾數據:長度小于多少、分割后長度小于多少val filterRDD = rdd.filter(line => (line.trim().length>0) && (line.split(",").length == 4))// 分割、排序、輸出var i = 1; // 最終輸出排名的序號val sortRdd = filterRDD.map(_.split(",")(2)) // 分隔每一行數據,RDD的類型變成Array[String],然后取索引為2的元素,就是要進行排序的數據.map(x => (x.toInt,"")) // 將該列數據的每一行都變為鍵值對RDD,鍵為數據,值為"".sortByKey(false) // 根據鍵進行降序排序.map(x => x._1) // 取排序后的那一列數據,只要鍵不要值.take(5) // 取出top5的數據.foreach(x => { // 遍歷打印println(i + "\t" + x)i+=1})} } -
案例2:求最大最小值
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object MaxAndMinVal {def main(args: Array[String]): Unit = {// 設置環境val sparkConf = new SparkConf().setMaster("local").setAppName("MaxAndMinVal")val sparkContext = new SparkContext(sparkConf)// 讀取文件,讀取進來每一行都是一個字符串val lines: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\maxandmin.txt")// 過濾、轉換、根據key進行分組、求最大最小值val rdd: Unit = lines.filter(line => line.trim.length > 0) // trim:刪除指定字符串的首尾空白符.map(line => ("key", line.toInt)).groupByKey() // 轉換為(“key”,value-list).map(line => {var minValue: Int = Integer.MAX_VALUEvar maxValue: Int = Integer.MIN_VALUEfor (num <- line._2) { // 遍歷value-list。line._2就是鍵值對(key,value-list)中的value-list,這里value-list就是<129,54,167,…,5,329,14,...>if (num < minValue) {minValue = num}if (num > maxValue) {maxValue = num}}(maxValue, minValue)}).collect().foreach(x => {println("最大值 = " + x._1)println("最小值 = " + x._2)})sparkContext.stop()} } -
案例3:文件排序
有多個輸入文件,每個文件中的每一行內容均為一個整數。要求讀取所有文件中的整數,進行排序后,輸出到一個新的文件中,輸出的內容個數為每行兩個整數,第一個整數為第二個整數的排序位次,第二個整數為原待排序的整數。
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{HashPartitioner, SparkConf, SparkContext}object FileSort {def main(args: Array[String]): Unit = {// 設置環境val sparkConf = new SparkConf().setMaster("local").setAppName("FileSort")val sparkContext = new SparkContext(sparkConf)// 讀取文件val rdd: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\filesort",3)// 過濾、分割、排序、輸出var index = 0; // 第一列:序號val result: RDD[(Int, Int)] = rdd.filter(_.trim.length > 0) // 過濾長度不大于0的記錄.map(x => (x.trim.toInt, "")) // 將字符串rdd轉換類型為:(整型,"").partitionBy(new HashPartitioner(1)) // 將3個分區歸為一個:由入輸入文件有多個,產生不同的分區,為了生成序號,使用HashPartitioner將中間的RDD歸約到一起.sortByKey() // 按照key進行升序排序.map(kv => { // 輸出兩列index += 1println(index + "\t" + kv._1)(index, kv._1)})result.saveAsTextFile("D:\\Document\\temp\\rddfile\\filesortout") // 保存為一個文件// 關閉scsparkContext.stop()} } -
案例4:二次排序
對于一個給定的文件(數據如file1.txt所示),請對數據進行排序,首先根據第1列數據降序排序,如果第1列數據相等,則根據第2列數據降序排序。
spark程序:
package Com.HT.Final.TwoTimesSortimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SecondarySort {def main(args: Array[String]): Unit = {// 設置配置信息、上下文環境val sparkConf = new SparkConf().setMaster("local").setAppName("SecondarySort")val sparkContext = new SparkContext(sparkConf)// 過濾、分割、轉換、二次排序(第一列降序,第一列相等的按照第二列降序排序)// 讀取文件val lines = sparkContext.textFile("D:\\Document\\temp\\rddfile\\secondarysort\\input.txt")val pairWithSortKey = lines.filter(line => line.trim.length>0) // 過濾.map(line => (new SecondarySortKey(line.split("\t")(0).toInt, line.split("\t")(1).toInt),line))// k-v,k是SecondarySortKey對象,規定了排序規則,v是原本輸入的一對數據// 根據鍵進行排序,這里會遵循 SecondarySortKey對象 的排序規則val sorted = pairWithSortKey.sortByKey(false)// 取出原本的一對數字組成的字符串val sortedResult = sorted.map(sortedLine => sortedLine._2)// 并打印sortedResult.collect().foreach (println)// 關閉scsparkContext.stop()} }SecondarySortKey:
package Com.HT.Final.TwoTimesSortimport org.apache.spark.{SparkConf, SparkContext}class SecondarySortKey(val first:Int,val second:Int) extends Ordered [SecondarySortKey] with Serializable {def compare(other:SecondarySortKey):Int = { // 實現compare方法,可以二次排序if (this.first - other.first !=0) { // first與other不相等this.first - other.first // 第一列降序排序} else { // first與other相等this.second - other.second // 第二列降序排序}} } -
案例5:連接操作
任務描述:在推薦領域有一個著名的開放測試集,下載鏈接是:http://grouplens.org/datasets/movielens/,該測試集包含三個文件,分別是ratings.dat、sers.dat、movies.dat,具體介紹可閱讀:README.txt。請編程實現:通過連接ratings.dat和movies.dat兩個文件得到平均得分超過4.0的電影列表,采用的數據集是:ml-1m。
package Com.HT.Finalimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}object SparkJoin {def main(args: Array[String]): Unit = {// 設置上下文環境val sparkConf = new SparkConf().setAppName("SparkJoin").setMaster("local")val sparkContext = new SparkContext(sparkConf)//TODO 1.處理ratings數據:讀取、分割、抽取、計算、keyby// 讀取ratings文件為RDD,一共4列val ratingsRDD: RDD[String] = sparkContext.textFile("D:\\Document\\temp\\rddfile\\join\\ratings.rat")// 提取(第2列movieid電影id, 第3列rating電影評分) val idAndRatings = ratingsRDD.map(line => {val fileds = line.split("::") // 分割,得到字符串數組(fileds(1).toInt, fileds(2).toDouble) // 提取電影id和電影評分,索引分別為1和2})// KeyBy: 為各個元素,按指定的函數生成key,形成key-value的RDD。// 電影id + 計算電影的平均評分val movieIdAndAvgScoreKey = idAndRatings.groupByKey() // 根據電影id將電影評分進行分組.map(data => {val avg = data._2.sum / data._2.size // 求平均評分(data._1, avg) // 返回電影id和平均評分}).keyBy(tup => tup._1) // 設置key為 電影id, value為 電影id和平均分//TODO 2.處理電影信息的數據::讀取、分割、抽取、keyby// 讀取movies文件為RDD,一共3列val moviesRDD = sparkContext.textFile("D:\\Document\\temp\\rddfile\\join\\movies.dat")// 提取(第1列movieid電影id, 第2列moviename電影名稱) val movieskey = moviesRDD.map(line => { // movieskey:(1,(1,Toy Story (1995) ))val fileds = line.split("::") // 分割為 (1,Toy Story (1995))(fileds(0).toInt, fileds(1)) // 整型數,字符串}).keyBy(tup => tup._1) // 設置key為 電影id, value為電影id和電影名稱//TODO 3.連接、過濾、抽取輸出val joinResult = movieIdAndAvgScoreKey // 連接操作.join(movieskey).filter(f => f._2._1._2 > 4.0) // 過濾.map(f => (f._1, f._2._1._2, f._2._2._2) // 取出電影id,電影平均分,電影名稱)joinResult.saveAsTextFile("D:\\Document\\temp\\rddfile\\joinoutput")} }// KeyBy: 為各個元素,按指定的函數生成key,形成key-value的RDD。 -
史上最全的spark面試題 https://www.cnblogs.com/think90/p/11461367.html
第十章 流計算
1.流計算與批處理的區別
批處理:處理離線數據。單個處理數據量大,處理速度比流慢。
流計算:處理實時產生的數據。單次處理的數據量小,但處理速度更快。
2.文件流
Spark支持從兼容HDFS API的文件系統中讀取數據,創建數據流。就是上面 Spark Streaming程序里提到的文件流。
http://dblab.xmu.edu.cn/blog/1082-2/
https://blog.csdn.net/zhangdy12307/article/details/90379543
3.socket
Spark Streaming可以通過Socket端口監聽并接收數據,然后進行相應處理。
使用命令開啟socket監聽端口:nc -lk [port]
socket工作原理(應該不會考):
如果有問題可以在評論區提出,或者私信我。如果哪里有錯誤的,歡迎提出~
總結
以上是生活随笔為你收集整理的大数据技术原理与应用:期末考点总结的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: npm run build后如何打开in
- 下一篇: ubuntu100%快速安装搜狗输入法