大数据乘(tu)风(tou)破(bian)浪(qiang)之路
QUESTION1大數據能做什么?
大數據無處不在,大數據應用于各個行業,包括金融、汽車、餐飲、電信、能源、體能和娛樂等在內的社會各行各業都已經融入了大數據的印跡。
電商是最早利用大數據進行精準營銷的行業,除了精準營銷,電商可以依據客戶消費習慣來提前為客戶備貨,并利用便利店作為貨物中轉點,在客戶下單15分鐘內將貨物送上門,提高客戶體驗。
例如:馬云的菜鳥網絡宣稱的24小時完成在中國境內的送貨;以及劉強東宣傳未來京東將在15分鐘完成送貨上門都是基于客戶消費習慣的大數據分析和預測。
隨著大數據技術的應用,越來越多的金融企業也開始投身到大數據應用實踐中。
麥肯錫的一份研究顯示,金融業在大數據價值潛力指數中排名第一。
典型的案例有:花旗銀行利用IBM沃森電腦為財富管理客戶推薦產品;美國銀行利用客戶點擊數據集為客戶提供特色服務,如有競爭的信用額度;招商銀行利用客戶刷卡、存取款、電子銀行轉帳、微信評論等行為數據進行分析,每周給客戶發送針對性廣告信息,里面有顧客可能感興趣的產品和優惠信息。
可見,大數據在金融行業的應用可以總結為以下五個方面:精準營銷、風險管控、決策支持、效率提升以及產品設計。
大數據讓就醫、看病更簡單。隨著大數據在醫療行業的深度融合,大數據平臺積累了海量的病例、病例報告、治愈方案、藥物報告等信息資源,所有常見的病例、既往病例等都記錄在案,醫生通過有效、連續的診療記錄,能夠給病人優質、合理的診療方案。這樣不僅提高醫生的看病效率,而且能夠降低誤診率,從而讓患者在最短的時間接受最好的治療。
零售行業大數據應用有兩個層面,一個層面是零售行業可以了解客戶消費喜好和趨勢,進行商品的精準營銷,降低營銷成本。另一層面是依據客戶購買產品,為客戶提供可能購買的其它產品,擴大銷售額,也屬于精準營銷范疇。例如:美國零售業的傳奇故事——“啤酒與尿布”。
交通作為人類行為的重要組成和重要條件之一,對于大數據的感知也是最急迫的。目前,交通的大數據應用主要在兩個方面,一方面可以利用大數據傳感器數據來了解車輛通行密度,合理進行道路規劃包括單行線路規劃。另一方面可以利用大數據來實現即時信號燈調度,提高已有線路運行能力。
《黑貓警長》大家都很熟悉,它講述的是“黑貓警長”如何精明能干、對壞人窮追不舍、跌宕起伏的故事情節。拿到大數據時代背景下的話,雖然它也能體現“黑貓警長”的盡職盡責、聰明能干,但更多的會歸結到一個問題:為何還是如此的被動、低效?疾病可以預防,難道犯罪不能預防么?
答案是肯定的。國家正在將大數據技術用于輿情監控,其收集到的數據除了解民眾訴求,降低群體事件之外,還可以用于犯罪管理。
?
以上內容來自百度,舉個身邊的例子,ping哥在淘寶購買了一罐奶粉,淘寶會基于海量下單記錄推送他一些之前買過奶粉的用戶后續也會下單的商品,比如紙尿布,沒準還會推送酒。為什么會推送酒呢,結合他的性別和購買奶粉這種行為對他分析,極大概率是已婚男士,那關于酒的故事還要從一個女人開始說起。。。
?
?
QUESTION2大數據的數據從哪來?
1、埋點產生的用戶行為數據:用戶在使用產品過程中,與客戶端產品交互過程中產生的數據,比如頁面瀏覽、點擊、停留、評論、點贊、收藏等 , 互聯網核心指標PV、UV的統計基礎.
2、JavaEE后臺產生的業務數據.
3、爬蟲:就是模擬客戶端發送網絡請求,接收請求響應,一種按照一定的規則,自動地抓取互聯網信息的程序。只要瀏覽器能夠做的事情,原則上,爬蟲都能夠做到。簡單來說,爬蟲就是自動從網絡上收集信息的一種程序,復雜點來說,就是一整套關于數據請求、處理、存儲的程序
?俗話說的好,爬蟲學的好,監獄進的早。。。
QUESTION3 數據有了,怎么樣采集到分布式集群中呢?
多圖帶你認知采集通道
業務系統數據處理鏈路
?
?
前端埋點數據處理鏈路
?
1、Apache Flume
1、1 flume是干什么的?
flume 是一個分布式的數據收集系統,具有高可靠、高可用、事務管理、失敗重啟、聚合和傳輸等功能。數據處理速度快,完全可以用于生產環境。
Flume 的使用不只限于日志數據。因為數據源可以定制,flume 可以被用來傳輸大量事件數據,這些數據不僅僅包括網絡通訊數據、社交媒體產生的數據、電子郵件信息等等。
flume如何搜集日志?
我們把flume比作情報人員
(1)搜集信息
(2)獲取記憶信息
(3)傳遞報告間諜信息
flume是怎么完成上面三件事情的,三個組件:
agent
flume 的核心是 agent。agent 是一個 java 進程,運行在日志收集端,通過 agent 接收日志,然后暫存起來,再發送到目的地。 每臺機器運行一個 agent。 agent 里面可以包含多個 source,channel,sink。
source
source 是數據的收集端,負責將數據捕獲后進行特殊的格式化,將數據封裝到 event 里,然后將事件推入 channel 中。flume 提供了很多內置的 source,支持 avro,log4j,syslog 等等。如果內置的 source 無法滿足環境的需求,flume 還支持自定義 source。
channel
channel 是連接 source 和 sink 的組件,大家可以將它看做一個數據的緩沖區(數據隊列),它可以將事件暫存到內存中也可以持久化到本地磁盤上, 直到 sink 處理完該事件。兩個較為常用的 channel,MemoryChannel 和 FileChannel。
sink
sink 從 channel 中取出事件,然后將數據發到別處,可以向文件系統、數據庫、hadoop、kafka,也可以是其他 agent 的 source。
配置單個組件
流中的每一個組件(source、channel、sink)都有自己的名稱、類型以及一系列配置屬性。例如,一個 Avro source 需要配置 hostname (或者 IP 地址)以及端口號來接收數據。一個內存模式 channel 可以有最大隊列長度的屬性("capacity": channel 中最大容納多少事件)。一個 HDFS slink 則需要知道文件系統的 URL(hdfs://**)、文件落地的路徑、文件回滾的評率("hdfs.rollInterval": 每隔多少秒將零時文件回滾成最終文件保存到 HDFS 中)。所有這些關于各個組件的屬性需要在配置文件中進行指定。
?
?
Flume還有多種拓撲模式 方便靈活搭建,下圖實現了復制和多路復用的功能
?
?
一個簡單的測試
這里,我們給出一個配置文件的示例,該示例為 flume 單節點部署的配置方式。
?# example.conf: A single-node Flume configuration
?# Name the components on this agent
?a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = netcat
?a1.sources.r1.bind = localhost
a1.sources.r1.port =?44444
# Describe the sink
?a1.sinks.k1.type = logger
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
?a1.channels.c1.capacity =?1000
a1.channels.c1.transactionCapacity =?100
# Bind the source and sink to the channel
?a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
bin/flume-ng agent -n a1 -c conf/ -f job/exp.conf -Dflume.root.logger=info,console
看看這個配置文件,我們可以發現這個 agent 的名稱是 a1。其中該 agent 的 source 監聽 44444 端口。channel 采用內存模式,而 slink 直接輸出數據到 控制臺上(logger)。配置文件指定了各個組件的名稱,并描述了它們的類型以及其他屬性。當然,一個配置文件可以配置多個 agent 屬性,當希望運行指定 agent 進程時,我們需要在命令行中顯示的給出該 agent 的名稱。
?
2、Apache Kafka
初識 Kafka
?
Kafka是啥?用Kafka官方的話來說就是:
Kafka is used for building real-time data pipelines and streaming apps. It is horizontally scalable, fault-tolerant,?wicked fast, and runs in production in thousands of companies.
大致的意思就是,這是一個實時數據處理系統,可以橫向擴展、高可靠,而且還變態快,已經被很多公司使用。
那么什么是實時數據處理系統呢?顧名思義,實時數據處理系統就是數據一旦產生,就要能快速進行處理的系統。
對于實時數據處理,我們最常見的,就是消息中間件了,也叫MQ(Message Queue,消息隊列),也有叫Message Broker的。
首先,我將從消息中間件的角度,帶大家看看Kafka的內部結構,看看它是如何做到橫向擴展、高可靠的同時,還能變態快的。
為什么需要消息中間件
消息中間件的作用主要有兩點:
·?解耦消息的生產和消費。
·?緩沖。
想象一個場景,你的一個創建訂單的操作,在訂單創建完成之后,需要觸發一系列其他的操作,比如進行用戶訂單數據的統計、給用戶發送短信、給用戶發送郵件等等,就像這樣:
createOrder(...){
?...
?statOrderData(...);
?sendSMS();
?sendEmail();}
代碼這樣寫似乎沒什么問題,可是過了一段時間,你給系統引進了一個用戶行為分析服務,它也需要在訂單創建完成之后,進行一個分析用戶行為的操作,而且隨著系統的逐漸壯大,創建訂單之后要觸發的操作也就越來越多,代碼也漸漸膨脹成這樣:
createOrder(...){
?...
?statOrderData(...);
?sendSMS();
?sendEmail();
?// new operation?statUserBehavior(...);
?doXXX(...);
?doYYY(...);
?// more and more operations?...}
導致代碼越來越膨脹的癥結在于,消息的生產和消費耦合在一起了。createOrder方法不僅僅要負責生產“訂單已創建”這條消息,還要負責處理這條消息。
這就好比espn的記者,在知道勇士拿到NBA冠軍之后,拿起手機,翻開勇士球迷通訊錄,給球迷一個一個打電話,告訴他們,勇士奪冠了。
事實上,espn的記者只需要在他們官網發布這條消息,然后球迷自行訪問espn,去上面獲取這條新聞;又或者球迷訂閱了espn,那么訂閱系統會主動把發布在官網的消息推送給球迷。
同樣,createOrder也需要一個像espn官網那樣的載體,也就是消息中間件,在訂單創建完成之后,把一條主題為“orderCreated”的消息,放到消息中間件去就ok了,不必關心需要把這條消息發給誰。這就完成了消息的生產。
至于需要在訂單創建完成之后觸發操作的服務,則只需要訂閱主題為“orderCreated”的消息,在消息中間件出現新的“orderCreated”消息時,就會收到這條消息,然后進行相應的處理。
因此,通過使用消息中間件,上面的代碼也就簡化成了:
createOrder(...){
?...
?sendOrderCreatedMessage(...);}
以后如果在訂單創建之后有新的操作需要執行,這串代碼也不需要修改,只需要給對消息進行訂閱即可。
另外,通過這樣的解耦,消費者在消費數據時更加的靈活,不必每次消息一產生就要馬上去處理(雖然通常消費者側也會有線程池等緩沖機制),可以等自己有空了的時候,再過來消息中間件這里取數據進行處理。這就是消息中間件帶來的緩沖作用。
kafka 的設計目標
Kafka一代 - 消息隊列
從上面的描述,我們可以看出,消息中間件之所以可以解耦消息的生產和消費,主要是它提供了一個存放消息的地方——生產者把消息放進來,消費者在從中取出消息進行處理。
那么這個存放消息的地方,應該采用什么數據結構呢?
在絕大多數情況下,我們都希望先發送進來的消息,可以先被處理(FIFO),這符合大多數的業務邏輯,少數情況下我們會給消息設置優先級。不管怎樣,對于消息中間件來說,一個先進先出的隊列,是非常合適的數據結構:
?
?
那么要怎樣保證消息可以被順序消費呢?
消費者過來獲取消息時,每次都把index=0的數據返回過去,然后再刪除index=0的那條數據?
很明顯不行,因為訂閱了這條消息的消費者數量,可能是0,也可能是1,還可能大于1。如果每次消費完就刪除了,那么其他訂閱了這條消息的消費者就獲取不到這條消息了。
事實上,Kafka會對數據進行持久化存儲(至于存放多長時間,這是可以配置的),消費者端會記錄一個offset,表明該消費者當前消費到哪條數據,所以下次消費者想繼續消費,只需從offset+1的位置繼續消費就好了。
消費者甚至可以通過調整offset的值,重新消費以前的數據。
那么這就是Kafka了嗎?不,這只是一條非常普通的消息隊列,我們姑且叫它為Kafka一代吧。
這個Kafka一代用一條消息隊列實現了消息中間件,這樣的簡單實現存在不少問題:
·?吞吐量低。我們把全部消息都放在一條隊列了,請求一多,它肯定應付不過來。
由此就引申出了Kafka二代。
Kafka二代 - Partition
二代Kafka引入了Partition的概念,也就是采用多條隊列, 每條隊列里面的消息都是相同的topic:
?
我們可以看到,每個Partition中的消息都是有序的,生產的消息被不斷追加到Partition log上,其中的每一個消息都被賦予了一個唯一的offset值。
發布到Kafka主題的每條消息包括鍵值和時間戳。消息到達服務器端的指定分區后,都會分配到一個自增的偏移量。原始的消息內容和分配的偏移量以及其他一些元數據信息最后都會存儲到分區日志文件中。消息的鍵也可以不用設置,這種情況下消息會均衡地分布到不同的分區。
1) 分區的原因?
(1)方便在集群中擴展,每個Partition可以通過調整以適應它所在的機器,而一個topic又可以有多個Partition組成,因此整個集群就可以適應任意大小的數據了;?
(2)可以提高并發,因為可以以Partition為單位讀寫了。
傳統消息系統在服務端保持消息的順序,如果有多個消費者消費同一個消息隊列,服務端會以消費存儲的順序依次發送給消費者。但由于消息是異步發送給消費者的,消息到達消費者的順序可能是無序的,這就意味著在并行消費時,傳統消息系統無法很好地保證消息被順序處理。雖然我們可以設置一個專用的消費者只消費一個隊列,以此來解決消息順序的問題,但是這就使得消費處理無法真正執行。
Kafka比傳統消息系統有更強的順序性保證,它使用主題的分區作為消息處理的并行單元。Kafka以分區作為最小的粒度,將每個分區分配給消費者組中不同的而且是唯一的消費者,并確保一個分區只屬于一個消費者,即這個消費者就是這個分區的唯一讀取線程。那么,只要分區的消息是有序的,消費者處理的消息順序就有保證。每個主題有多個分區,不同的消費者處理不同的分區,所以Kafka不僅保證了消息的有序性,也做到了消費者的負載均衡。
2)分區的原則?
(1)指定了patition,則直接使用;?
(2)未指定patition但指定key,通過對key的value進行hash出一個patition?
(3)patition和key都未指定,使用輪詢選出一個patition。
Kafka二代足夠完美了嗎?當然不是,我們雖然通過Partition提升了性能,但是我們忽略了一個很重要的問題——高可用。
萬一機器掛掉了怎么辦?單點系統總是不可靠的。我們必須考慮備用節點和數據備份的問題。
Kafka三代 - Broker集群
很明顯,為了解決HA問題,我們需要集群。
Kafka對集群的支持也是非常友好的。在Kafka中,集群里的每個實例叫做Broker,就像這樣:
?
每個partition不再只有一個,而是有一個leader(紅色)和多個replica(藍色),生產者根據消息的topic和key值,確定了消息要發往哪個partition之后(假設是p1),會找到partition對應的leader(也就是broker2里的p1),然后將消息發給leader,leader負責消息的寫入,并與其余的replica進行同步。
一旦某一個partition的leader掛掉了,那么只需提拔一個replica出來,讓它成為leader就ok了,系統依舊可以正常運行。
通過Broker集群的設計,我們不僅解決了系統高可用的問題,還進一步提升了系統的吞吐量,因為replica同樣可以為消費者提供數據查找的功能。
Kafka沒那么簡單
以上只是帶大家初步認識一下Kafka,很多細節并沒有深入討論,比如:
1、Kafka的消息結構?
我們只知道Kafka內部是一個消息隊列,但是隊列里的元素長什么樣,包含了哪些消息呢?
2、Zookeeper和Kafka的關系?
如果玩過Kafka的,就會發現,我們在使用Kafka時,需要先啟動一個ZK,那么這個ZK的作用到底是什么呢?
3、數據可靠性和重復消費
生產者把消息發給Kafka,發送過程中掛掉、或者Kafka保存消息時發送異常怎么辦?
同理,消費者獲取消費時發生異常怎么辦?
甚至,如果消費者已經消費了數據,但是修改offset時失敗了,導致重復消費怎么辦?
等等這些異常場景,都是Kafka需要考慮的。
4、 pull or push
消費者側在獲取消息時,是通過主動去pull消息呢?還是由Kafka給消費者push消息?
這兩種方式各自有什么優劣?
5、 如何提高消費者處理性能
還是之前的訂單創建的例子,訂單創建后,你要給用戶發送短信,現在你發現由于你只有一個消費者在發送短信,忙不過來,怎么辦?這就有了Kafka里頭的消費者組(Consumer Group)的設計。
……
終極問題:一條消息從生產,到被消費,完整流程是怎樣的?
如果能詳盡透徹地回答這個問題,那你對Kafka的理解也就非常深入了。
?
QUESTION4數據采集到了,如何進行存儲及計算呢??
離線數據開發
1、APACHE Hadoop
初識Hadoop
hadoop是什么?Hadoop是一種分析和處理大數據的軟件平臺,是Apache的一個用Java語言所實現的開源軟件的框架,在大量計算機組成的集群當中實現了對于海量的數據進行的分布式計算。
1.1MapReduce是什么
1、概念理解
??????Hadoop Map/Reduce是一個使用簡易的軟件框架,基于它寫出來的應用程序能夠運行在由上千個商用機器組成的大型集群上,并以一種可靠容錯的方式并行處理上T級別的數據集。
2、Map(映射)
??????“Map”:主結點讀入輸入數據,把它分成可以用相同方法解決的小數據塊(這里是一個分而治之的思想),然后把這些小數據塊分發到不同的工作節點上(worder nodes)上,每一個工作節點(worder node)循環做同樣的事,這就行成了一個樹行結構,而每一個葉子節點有來處理每一個具體的小數據塊,再把這些處理結果返回給父節點。
3、Reduce(歸約)
??????“Reduce”:主結節得到所有子節點的處理結果,然后把所有結果組合并且返回到輸出。
4、個人理解
??????簡單的來講,map就是分,reduce就是合。怎么理解呢?我們來看個例子。?
??????我們將100噸磚,從山東運到北京,如果我們用一輛能裝1噸的大卡車來運,一天跑一個來回,那么我們需要100天,可是如果我們用10輛這樣的車來做同樣的事情,那么我們10天就可以完成了。雖然在現實生活中,我們增加了車費等一系列支出,可能不太劃算,但是對于計算機來說,我們的成本是相當低的。所以在迎接大數據的到來時,MapReduce將大大提高的計算的速度,特別方便。
?
?
?
1.2Shuffle - 奇跡發生的地方
?
上面的流程是整個MapReduce最全工作流程,但是Shuffle過程只是從第7步開始到第16步結束,具體Shuffle過程詳解,如下:
1)MapTask收集我們的map()方法輸出的kv對,放到內存緩沖區中
2)從內存緩沖區不斷溢出本地磁盤文件,可能會溢出多個文件
3)多個溢出文件會被合并成大的溢出文件
4)在溢出過程及合并的過程中,都要調用Partitioner進行分區和針對key進行排序
5)ReduceTask根據自己的分區號,去各個MapTask機器上取相應的結果分區數據
6)ReduceTask會取到同一個分區的來自不同MapTask的結果文件,ReduceTask會將這些文件再進行合并(歸并排序)
7)合并成大文件后,Shuffle的過程也就結束了,后面進入ReduceTask的邏輯運算過程(從文件中取出一個一個的鍵值對Group,調用用戶自定義的reduce()方法)
3.注意
Shuffle中的緩沖區大小會影響到MapReduce程序的執行效率,原則上說,緩沖區越大,磁盤io的次數越少,執行速度就越快。
緩沖區的大小可以通過參數調整,參數:io.sort.mb默認100M。
?
?
2、基于Apache Hive 的離線數倉
2.1 什么是Hive
Hive是Facebook開源的基于Hadoop的一個數據倉庫工具,可以將結構化的數據文件映射為一張數據庫表,并提供完整的sql查詢功能,可以將sql語句轉換為MapReduce任務進行運行。其優點是學習成本低,可以通過類SQL語句快速實現簡單的MapReduce統計,不必開發專門的MapReduce應用,十分適合數據倉庫的統計分析。
Hive是建立在 Hadoop 上的數據倉庫基礎構架。它提供了一系列的工具,可以用來進行數據提取轉化加載(ETL),這是一種可以存儲、查詢和分析存儲在 Hadoop 中的大規模數據的機制。Hive 定義了簡單的類 SQL 查詢語言,稱為 HQL,它允許熟悉 SQL 的用戶查詢數據。同時,這個語言也允許熟悉 MapReduce 開發者的開發自定義的 mapper 和 reducer 來處理內建的 mapper 和 reducer 無法完成的復雜的分析工作。
?
2.2、Hive本質
將HQL轉換為MapReduce程序
體現在:
1、處理數據在HDFS上
2、通過MR實現
3、執行程序運行在Yarn上
?
2.3、數倉建模
ODS:通常而言,原始數據的種類是非常豐富的,我們可能從幾十個業務方把數據拉回來,然后格式化放到HDFS上。但很多時候,情況并不這么簡單,雖然有很多的損壞數據、臟數據等是不需要統計的,但是我們需要來看為什么會產生臟數據,這時候原始數據就會提供很好的樣板。再有些時候,針對一些流量作弊的數據,如果按照統一規則,很容易就給過濾掉了,然后運營就問過來為什么對方提供的數據與我們的差異這么多大,這時候同樣需要去看原始日志。因而,ODS的意義,在于保存最完整的數據現場,便于一些特殊場景下的問題排查使用。
?
DWD:如果采集的數據沒有問題了,我們這里就需要做數據的預處理了。例如存在HDFS上的標準格式,我們就用字符串的格式來統一存儲。還有時候因為場景要求,需要直接轉成Parquent、ORC等列存格式,也需要在這里做轉換。但預處理并不是簡單的轉換格式,還需要處理一些臟數據,例如字段缺失、格式錯誤、亂碼、空值,等等,在這一層處理好之后,后續的計算便不需要再擔心各種各樣的異常情況,對于開發效率的提升有著極大的幫助。有些時候還要發揮一些特定作用,因為業務的意外導致各種各樣的錯誤數據進來,也是時有發生的。比如客戶消費了,金額總得是正的吧,但如果業務那邊產生了一些錯誤,需要將金額設置成負值,雖然業務那邊好處理了,但數據這里就頭疼了。所以還需要經常打一些補丁,來處理金額負值這種異常情況。所以看起來DWD像是多余的一層,但當業務場景足夠復雜之后,它所發揮的作用還是很大的。這里數據預處理主要采用MR來進行,基本上遇不到數據傾斜等問題。
?
DWS:當所有的數據都存好了,處理完臟數據之后,下一步我們就需要考慮如何處理和組織統計邏輯了。數據倉庫之所以叫數據倉庫,正是因為DWS層的重要。數據模型有很多,如:3NF范式模型、維度模型(星座、雪花、星型)、Data Vault等,但最常用的還是星型模型。通常我們會根據主題來進行表數據的統計,這里還有一個常用的說法,叫“中間層”。例如我們數據層次自上往下分別是:用戶、廣告投放計劃、計劃詳情,用戶本身有行業、主體公司等屬性,廣告投放計劃包括了單元、創意等屬性,計劃詳情包括了投放類型、投放地域等屬性。那么我們在這個DWS層,就需要針對所有可能的維度,包括用戶、行業、主體公司、廣告投放計劃、單元、創意、計劃詳情、投放類型、投放地域做統計,每個類型都盡可能的冗余維度信息,例如用戶維度的統計要把行業、主體公司等維度冗余進來,放到一張表里。這么做雖然特別違反三范式的原則,也違反很多模型,但是冗余盡可能多的信息,對于提高下游計算的速度、減少運算數據量、簡化業務邏輯、合并計算單元等具有特別大的好處。
?
ADS:當需求足夠多時,我們要提供的報表就不是幾十張的概念了,而是成百上千張,這么多的表怎么保證數據的一致性呢?怎么保證需求響應的速度呢?基本上都是ADS層需要面臨的問題。在前一個層次DWS中,我們把所有的主題都盡可能多的冗余了維度信息,因此這里需要盡量從單一中間層表中進行數據統計,中間層的數據一致性,就代表了最終業務數據的一致性。響應速度同理,在某些不得不關聯的業務場景下,因為中間層的存在,使得數據量減少了很多,需求響應速度也就提升了很多。
?
DIM:維度信息
3、實時數據開發組件-Flink
?1.1 ?初識Flink
Flink起源于Stratosphere項目,Stratosphere是在2010~2014年由3所地處柏林的大學和歐洲的一些其他的大學共同進行的研究項目,2014年4月Stratosphere的代碼被復制并捐贈給了Apache軟件基金會,參加這個孵化項目的初始成員是Stratosphere系統的核心開發人員,2014年12月,Flink一躍成為Apache軟件基金會的頂級項目。
在德語中,Flink一詞表示快速和靈巧,項目采用一只松鼠的彩色圖案作為logo,這不僅是因為松鼠具有快速和靈巧的特點,還因為柏林的松鼠有一種迷人的紅棕色,而Flink的松鼠logo擁有可愛的尾巴,尾巴的顏色與Apache軟件基金會的logo顏色相呼應,也就是說,這是一只Apache風格的松鼠。
Flink Logo
Flink項目的理念是:“Apache Flink是為分布式、高性能、隨時可用以及準確的流處理應用程序打造的開源流處理框架”。
Apache Flink是一個框架和分布式處理引擎,用于對無界和有界數據流進行有狀態計算。Flink被設計在所有常見的集群環境中運行,以內存執行速度和任意規模來執行計算。
1.2 ?Flink的重要特點
1.2.1 ?事件驅動型(Event-driven)
事件驅動型應用是一類具有狀態的應用,它從一個或多個事件流提取數據,并根據到來的事件觸發計算、狀態更新或其他外部動作。比較典型的就是以kafka為代表的消息隊列幾乎都是事件驅動型應用。
與之不同的就是SparkStreaming微批次,如圖:
事件驅動型:
1.2.2 流與批的世界觀
批處理的特點是有界、持久、大量,非常適合需要訪問全套記錄才能完成的計算工作,一般用于離線統計。
流處理的特點是無界、實時, ?無需針對整個數據集執行操作,而是對通過系統傳輸的每個數據項執行操作,一般用于實時統計。
在spark的世界觀中,一切都是由批次組成的,離線數據是一個大批次,而實時數據是由一個一個無限的小批次組成的。
而在flink的世界觀中,一切都是由流組成的,離線數據是有界限的流,實時數據是一個沒有界限的流,這就是所謂的有界流和無界流。
無界數據流:無界數據流有一個開始但是沒有結束,它們不會在生成時終止并提供數據,必須連續處理無界流,也就是說必須在獲取后立即處理event。對于無界數據流我們無法等待所有數據都到達,因為輸入是無界的,并且在任何時間點都不會完成。處理無界數據通常要求以特定順序(例如事件發生的順序)獲取event,以便能夠推斷結果完整性。
有界數據流:有界數據流有明確定義的開始和結束,可以在執行任何計算之前通過獲取所有數據來處理有界流,處理有界流不需要有序獲取,因為可以始終對有界數據集進行排序,有界流的處理也稱為批處理。
這種以流為世界觀的架構,獲得的最大好處就是具有極低的延遲。
2?Flink運行時的組件
Flink運行時架構主要包括四個不同的組件,它們會在運行流處理應用程序時協同工作:作業管理器(JobManager)、資源管理器(ResourceManager)、任務管理器(TaskManager),以及分發器(Dispatcher)。因為Flink是用Java和Scala實現的,所以所有組件都會運行在Java虛擬機上。每個組件的職責如下:
l?作業管理器(JobManager)
控制一個應用程序執行的主進程,也就是說,每個應用程序都會被一個不同的JobManager所控制執行。JobManager會先接收到要執行的應用程序,這個應用程序會包括:作業圖(JobGraph)、邏輯數據流圖(logical dataflow graph)和打包了所有的類、庫和其它資源的JAR包。JobManager會把JobGraph轉換成一個物理層面的數據流圖,這個圖被叫做“執行圖”(ExecutionGraph),包含了所有可以并發執行的任務。JobManager會向資源管理器(ResourceManager)請求執行任務必要的資源,也就是任務管理器(TaskManager)上的插槽(slot)。一旦它獲取到了足夠的資源,就會將執行圖分發到真正運行它們的TaskManager上。而在運行過程中,JobManager會負責所有需要中央協調的操作,比如說檢查點(checkpoints)的協調。
l?資源管理器(ResourceManager)
主要負責管理任務管理器(TaskManager)的插槽(slot),TaskManger插槽是Flink中定義的處理資源單元。Flink為不同的環境和資源管理工具提供了不同資源管理器,比如YARN、Mesos、K8s,以及standalone部署。當JobManager申請插槽資源時,ResourceManager會將有空閑插槽的TaskManager分配給JobManager。如果ResourceManager沒有足夠的插槽來滿足JobManager的請求,它還可以向資源提供平臺發起會話,以提供啟動TaskManager進程的容器。另外,ResourceManager還負責終止空閑的TaskManager,釋放計算資源。
l?任務管理器(TaskManager)
Flink中的工作進程。通常在Flink中會有多個TaskManager運行,每一個TaskManager都包含了一定數量的插槽(slots)。插槽的數量限制了TaskManager能夠執行的任務數量。啟動之后,TaskManager會向資源管理器注冊它的插槽;收到資源管理器的指令后,TaskManager就會將一個或者多個插槽提供給JobManager調用。JobManager就可以向插槽分配任務(tasks)來執行了。在執行過程中,一個TaskManager可以跟其它運行同一應用程序的TaskManager交換數據。
l?分發器(Dispatcher)
可以跨作業運行,它為應用提交提供了REST接口。當一個應用被提交執行時,分發器就會啟動并將應用移交給一個JobManager。由于是REST接口,所以Dispatcher可以作為集群的一個HTTP接入點,這樣就能夠不受防火墻阻擋。Dispatcher也會啟動一個Web UI,用來方便地展示和監控作業執行的信息。Dispatcher在架構中可能并不是必需的,這取決于應用提交運行的方式。
?
3.1 Window及基于體溫數據的Demo
3.1.1?Window概述
streaming流式計算是一種被設計用于處理無限數據集的數據處理引擎,而無限數據集是指一種不斷增長的本質上無限的數據集,而window是一種切割無限數據為有限塊進行處理的手段。
Window是無限數據流處理的核心,Window將一個無限的stream拆分成有限大小的”buckets”桶,我們可以在這些桶上做計算操作。
3.1.2 Window類型
Window可以分成兩類:
??CountWindow:按照指定的數據條數生成一個Window,與時間無關。
??TimeWindow:按照時間生成Window。
對于TimeWindow,可以根據窗口實現原理的不同分成三類:滾動窗口(Tumbling Window)、滑動窗口(Sliding Window)和會話窗口(Session Window)。
1.?滾動窗口(Tumbling Windows)
將數據依據固定的窗口長度對數據進行切片。
特點:時間對齊,窗口長度固定,沒有重疊。
滾動窗口分配器將每個元素分配到一個指定窗口大小的窗口中,滾動窗口有一個固定的大小,并且不會出現重疊。例如:如果你指定了一個5分鐘大小的滾動窗口,窗口的創建如下圖所示:
圖 滾動窗口
適用場景:適合做BI統計等(做每個時間段的聚合計算)。
2.?滑動窗口(Sliding Windows)
滑動窗口是固定窗口的更廣義的一種形式,滑動窗口由固定的窗口長度和滑動間隔組成。
特點:時間對齊,窗口長度固定,可以有重疊。
滑動窗口分配器將元素分配到固定長度的窗口中,與滾動窗口類似,窗口的大小由窗口大小參數來配置,另一個窗口滑動參數控制滑動窗口開始的頻率。因此,滑動窗口如果滑動參數小于窗口大小的話,窗口是可以重疊的,在這種情況下元素會被分配到多個窗口中。
例如,你有10分鐘的窗口和5分鐘的滑動,那么每個窗口中5分鐘的窗口里包含著上個10分鐘產生的數據,如下圖所示:
圖 滑動窗口
適用場景:對最近一個時間段內的統計(求某接口最近5min的失敗率來決定是否要報警)。
3.?會話窗口(Session Windows)
由一系列事件組合一個指定時間長度的timeout間隙組成,類似于web應用的session,也就是一段時間沒有接收到新數據就會生成新的窗口。
特點:時間無對齊。
session窗口分配器通過session活動來對元素進行分組,session窗口跟滾動窗口和滑動窗口相比,不會有重疊和固定的開始時間和結束時間的情況,相反,當它在一個固定的時間周期內不再收到元素,即非活動間隔產生,那個這個窗口就會關閉。一個session窗口通過一個session間隔來配置,這個session間隔定義了非活躍周期的長度,當這個非活躍周期產生,那么當前的session將關閉并且后續的元素將被分配到新的session窗口中去。
Window-demo ?實時輸出五秒內體溫最高的人的信息
package com.morant.apitest
?
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.time.Time
?
?
object WindowTest {
? def main(args: Array[String]): Unit = {
?
? ? val env = StreamExecutionEnvironment.getExecutionEnvironment
? ?env.setParallelism(1)
? ? env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
? ? env.getConfig.setAutoWatermarkInterval(100)
?
?
? ? ? ? val inputStream: DataStream[String] = env.readTextFile("D:\\DEVELOP_CODE\\workspace\\FlinkTest\\src\\main\\resources\\Temperature.txt")
// ? ?val inputStream = env.socketTextStream("172.19.177.124", 7777)
? ?// Transform操作 ? ?val dataStream: DataStream[Temperature] = inputStream
? ? ? .map(data => {
? ? ? ? val dataArray = data.split(",")
? ? ? ? Temperature(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
? ? ? })
? ? ? // ? ? ?.assignAscendingTimestamps(_.timestamp * 1000L) ? ? ?// 對亂序數據分配時間戳和watermark ? ? ?.assignTimestampsAndWatermarks( new BoundedOutOfOrdernessTimestampExtractor[Temperature](Time.seconds(1)) {
? ? ? ? override def extractTimestamp(t: Temperature): Long = t.timestamp * 1000L ? ? ?} )
?
? ? // 統計每個溫度計器每5秒內的溫度最大值 ? ?val processedStream = dataStream
? ? ? .keyBy(_.id)
? ? ? // ? ? ?.window(TumblingProcessingTimeWindows.of(Time.hours(1))) ? ? ?.timeWindow(Time.seconds(5)) // 定義長度為5秒的滾動窗口 ? ? ?.reduce((curMaxTempData, newData) =>
? ? ? ? Temperature(curMaxTempData.id, ? ? ? ? ?newData.timestamp, ? ? ? ? ?newData.temperature.max(curMaxTempData.temperature)
? ? ? ? )
? ? ? )
? ? // ? ? ?.reduce()
? ?processedStream.print()
? ? env.execute("window test")
? }
}
4.1?側輸出流(SideOutput)及體溫大于37.5放入測輸出流的Demo
大部分的DataStream API的算子的輸出是單一輸出,也就是某種數據類型的流。除了split算子,可以將一條流分成多條流,這些流的數據類型也都相同。process function的side outputs功能可以產生多條流,并且這些流的數據類型可以不一樣。一個side output可以定義為OutputTag[X]對象,X是輸出流的數據類型。process function可以通過Context對象發射一個事件到一個或者多個side outputs。
?
SideOutPut ?將體溫高于37.5的信息再測輸出流中輸出
case class Temperature( id: String, timestamp: Long, temperature: Double )
object SideOutputTest {
? def main(args: Array[String]): Unit = {
? ? val env = StreamExecutionEnvironment.getExecutionEnvironment ? ?env.setParallelism(1)
?
? ? ? ? val inputStream: DataStream[String] = env.readTextFile("D:\\DEVELOP_CODE\\workspace\\FlinkTest\\src\\main\\resources\\Temperature.txt")
// ? ?val inputStream = env.socketTextStream("192.168.1.101", 7777)
? ?// Transform操作 ? ?val dataStream: DataStream[Temperature] = inputStream
? ? ? .map(data => {
? ? ? ? val dataArray = data.split(",")
? ? ? ? Temperature(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
? ? ? })
?
? ? val highTempStream = dataStream
? ? ? .process(new SplitTempMonitor())
?
? ? highTempStream.print("low")
? ? highTempStream.getSideOutput(new OutputTag[String]("high-temp")).print("high")
?
? ? env.execute("process function test")
? }
}
?
// 自定義process function,實現分流操作class SplitTempMonitor() extends ProcessFunction[Temperature, Temperature]{
? override def processElement(value: Temperature, ctx: ProcessFunction[Temperature, Temperature]#Context, out: Collector[Temperature]): Unit = {
? ? // 判斷當前數據的溫度值,如果在37.5以上,輸出到側輸出流 ? ?if( value.temperature > 37.5 ){
? ? ? ctx.output(new OutputTag[String]("high-temp"), value.id+"->有病")
? ? } else{
? ? ? // 37.5度以下的數據,輸出到主流 ? ? ?out.collect(value)
? ? }
? }
}
?
?
?
總結
以上是生活随笔為你收集整理的大数据乘(tu)风(tou)破(bian)浪(qiang)之路的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 编码与DNA存储——DNA码的构造
- 下一篇: 人活着就是在对抗熵增 | 熵增启示录