一、Flink流處理簡介
Flink流處理的API叫做DataStream,可以在保證Exactly-Once的前提下提供高吞吐、低延時的實時流處理。
二、Flink中的Time模型
Flink中提供了3種時間模型:EventTime、ProcessingTime、與Ingestion Time。底層實現上分為2種:Processing Time與Event Time,而Ingestion Time本質上也是一種Event Time,可以通過官方文檔上的一張圖展現是3者的區別:
Event Time:事件產生的時間,即數據產生時自帶時間戳,例如‘2016/06/17 11:04:00.960’
Ingestion Time:數據進入到Flink的時間,即數據進入source operator時獲取時間戳
Processing Time:系統時間,與數據本身的時間戳無關,即在window窗口內計算完成的時間(默認的Time)
關于Event Time,需要指出的是:數據產生的時間,編程時首先就是要告訴Flink,哪一列作為Event Time列,同時分配時間戳(TimeStamp)并發出水位線(WaterMark),來跟蹤Event Time。簡單理解,就是以Event Time列作為時間。水位線既然是用來標記Event Time的,那么Event Time在產生時有可能因為網絡或程序錯誤導致的時間亂序,即Late Element的產生,因此WaterMark分為有序與無序2種:
關于Late Element,舉個例子說明:數據隨著時間的流逝而產生,即數據的產生本是升序的,當Flink采用Event Time作為時間模型時,理論上也應該是升序的數據不斷的進行計算。但是突然有個“延遲的”數據進入到了Flink,此時時間窗口已過,那么這個“延遲的”數據就不會被正確的計算。
對于這些數據,流處理的可能無法實時正確計算,因為WarterMark不可能無限制的等待Late Element的到來,所以可以通過之后的批處理(batch)對已經計算的數據進行更正。
三、Flink流處理編程的步驟
共5個步驟:
1、獲取DataStream的運行環境
2、從Source中創建DataStream
3、在DataStream上進行transformation操作
4、將結果輸出
5、執行流處理程序
四、程序說明
說明:
IDE:IntelliJ IDEA Community Edition(From JetBrains)
開發語言: Scala 2.10
運行環境:Flink 1.0.3 集群(1個JobManager+2個TaskManager)
程序提交:客戶端CLI
管理工具:maven 3.3.9
五、程序演示–體會Event Time
關鍵點:
設置Event time characteristic:
env.setStreamTimeCharacteristic
(TimeCharacteristic.EventTime
)
復寫map方法,實現稍微復雜點的map transaction:
map
(new EventTimeFunction
)
分配timestamp以及watermark:
val timeValue
= parsedStream.assignAscendingTimestamps
(_._2
)
在dataStream上運行keyBy操作,產生keyedStream,繼而在keyedStream上運行window操作,產生windowedStream,此時,windowedStream包含的元素主要包含3方面:K->key,W->window,T->Iterable[(…)],即每個key在特定窗口內的元素的集合。不同的stream之間的互相調用,可以參考:
在windowedStream上聚合sum:
val sumVolumePerMinute
= timeValue.keyBy
(_._1
).window
(TumblingEventTimeWindows.of
(Time.minutes
(1
))).sum
(3
).name
("sum volume per minute")
運行程序,測試結果:
輸入:
600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......
輸出如下:
(60000,20160520093000960,600)
(60000,20160520093101000,300)
...
可以看出,結果就是按照Event Time的時間窗口計算得出的,而無關系統的時間(包括輸入的快慢)。
Event Time Test的詳細完整代碼如下:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time/*** 這是一個簡單的Flink DataStream程序,實現每分鐘的累計成交量* source:通過SocketStream模擬kafka消費數據* sink:直接print輸出到local,以后要實現sink到HDFS以及寫到Redis* 技術點:* 1、采用EventTime統計每分鐘的累計成交量,而不是系統時鐘(processing Time)* 2、將輸入的時間合并并生成Long類型的毫秒時間,以此作為Timestamp,生成Timestamp和WaterMark* 3、采用TumblingEventTimeWindow作為窗口,即翻滾窗口,不重疊的范圍內實現統計*/
object TransactionSumVolume1
{case class Transaction
(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long
)def main
(args: Array
[String
]): Unit
= {/*** when Running the program, you should input 2 parameters:
hostname and port of Socket*/
if (args.length
!= 2
) {System.err.println
("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName
= args
(0
)val port
= args
(1
).toInt/*** Step 1. Obtain an execution environment
for DataStream operation*
set EventTime instead of Processing Time*/val
env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic
(TimeCharacteristic.EventTime
)/*** Step 2. Create DataStream from socket*/val input
= env.socketTextStream
(hostName,port
)/*** Step 3. Implement
'分鐘成交量' logic*//*** parse input stream to a new Class
which is implement the Map
function*/val parsedStream
= input.map
(new EventTimeFunction
)/*** assign Timestamp and WaterMark
for Event time: eventTime
(params should be a Long type
)*/val timeValue
= parsedStream.assignAscendingTimestamps
(_._2
)val sumVolumePerMinute
= timeValue.keyBy
(_._1
).window
(TumblingEventTimeWindows.of
(Time.minutes
(1
))).sum
(3
).name
("sum volume per minute")/*** Step 4. Sink the final result to standard output
(.out file
)*/sumVolumePerMinute.map
(value
=> (value._1,value._3,value._4
)).print
()/*** Step 5. program execution*/env.execute
("SocketTextStream for sum of volume Example")}class EventTimeFunction extends MapFunction
[String,
(Long, Long, String, Long
)] {def map
(s: String
):
(Long, Long, String, Long
) = {val columns
= s.split
(",")val transaction
: Transaction
= Transaction
(columns
(0
),columns
(1
).toLong,columns
(2
),columns
(3
),columns
(4
).toLong,columns
(5
).toLong,columns
(6
).toLong,columns
(7
).toLong,columns
(8
).toLong,columns
(9
).toInt,columns
(9
),columns
(10
),columns
(11
).toLong,columns
(12
).toLong,columns
(13
).toLong
)val
format = new SimpleDateFormat
("yyyyMMddHHmmssSSS")val volume
: Long
= transaction.nVolumeval szCode
: Long
= transaction.szCode
if (transaction.nTime.length
== 8
) {val eventTimeString
= transaction.nAction +
'0' + transaction.nTimeval eventTime
: Long
= format.parse
(eventTimeString
).getTime
(szCode, eventTime, eventTimeString, volume
)}else
{val eventTimeString
= transaction.nAction + transaction.nTimeval eventTime
= format.parse
(eventTimeString
).getTime
(szCode, eventTime, eventTimeString, volume
)}}}}
六、程序演示–體會Processing Time
關鍵點:
設置TumblingProcessingTimeWindow,由于默認的Time Characteristic就是Processing Time,因此不用特別指定,在windowed assign時,只需指定系統自帶的timeWindow即可:
timeWindow
(Time.seconds
(15
))
在windowedStream之后,需要進行聚合操作,產生新的DataStream。系統提供了sum、reduce、fold等操作,但是如果遇到窗口內的計算非常復雜的情況,則需要采用apply{…}方法。windowedStream.apply{}的方法可參考源碼:org.apache.flink.streaming.api.scala.WindowedStream.scala
提供了6種不同的方法,詳情見:
WindowedStream.scala
這個測試就是在windowedStream上調用了apply方法,實現了稍微復雜的運算:
.apply
{ (k
: Long, w
: TimeWindow, T: Iterable
[(Long, Long, Long
)], out
: Collector
[(Long,String,String,Double
)]) =>var sumVolume
: Long
= 0var sumTurnover
: Long
= 0for
(elem
<- T
){sumVolume
= sumVolume + elem._2sumTurnover
= sumTurnover + elem._3
}val
format = new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss.SSS")val vwap
: Double
= BigDecimal
(String.valueOf
(sumTurnover
))./
(BigDecimal
(String.valueOf
(sumVolume
))).setScale
(2,BigDecimal.RoundingMode.HALF_UP
).toDoubleout.collect
((k,format.format(w.getStart),format.format(w.getEnd),vwap))}
運行程序,測試結果:
輸入(由于是Processing Time,輸入時要注意時間間隔,超過15秒的就會產生新窗口,我的操作是前2條數據同時輸入,隔一段時間后輸入第3條數據):
600000.SH,600000,20160520,93000960,1,39,173200,400,800,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93059000,1,39,173200,200,1000,66,0,0,62420,76334,93002085
600000.SH,600000,20160520,93101000,1,39,173200,300,1200,66,0,0,62420,76334,93002085
......
結果如下:
(60000,2016-06-16 17:56:00.000,2016-06-16 17:56:15.000,3.0)
(60000,2016-06-16 17:58:15.000,2016-06-16 17:58:30.000,4.0)
可以看到,這個結果跟事件的時間沒有任何關系,只跟系統處理完成的時間有關。
完整的代碼如下:
import java.text.SimpleDateFormat
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.util.Collector/*** 這個Flink DataStream程序,實現“每15秒的加權平均價--VWAP”* source:通過SocketStream模擬kafka消費數據* sink:直接print輸出到local,以后要實現sink到HDFS以及寫到Redis* 技術點:* 1、采用默認的Processing Time統計每15秒鐘的加權平均價* 2、采用TumblingProcessingTimeWindow作為窗口,即翻滾窗口,系統時鐘,不重疊的范圍內實現統計* 3、在WindowedStream上實現自定義的apply算法,即加權平均價,而非簡單的Aggregation*/object TransactionVWap
{case class Transaction
(szWindCode:String, szCode:Long, nAction:String, nTime:String, seq:Long, nIndex:Long, nPrice:Long,nVolume:Long, nTurnover:Long, nBSFlag:Int, chOrderKind:String, chFunctionCode:String,nAskOrder:Long, nBidOrder:Long, localTime:Long
)def main
(args: Array
[String
]): Unit
= {/*** when Running the program, you should input 2 parameters:
hostname and port of Socket*/
if (args.length
!= 2
) {System.err.println
("USAGE:\nSocketTextStreamWordCount <hostname> <port>")return}val hostName
= args
(0
)val port
= args
(1
).toInt/*** Step 1. Obtain an execution environment
for DataStream operation*
set EventTime instead of Processing Time*/val
env = StreamExecutionEnvironment.getExecutionEnvironment//Processing
time is also the Default TimeCharacteristicenv.setStreamTimeCharacteristic
(TimeCharacteristic.ProcessingTime
)/*** Step 2. Create DataStream from socket*/val input
= env.socketTextStream
(hostName,port
)/*** Step 3. Implement
'每15秒加權平均價-VWAP' logic* Note: windowedStream contains 3 attributes: T
=>elements, K
=>key, W
=>window*/val sumVolumePerMinute
= input//transform Transaction to tuple
(szCode, volume, turnover
).map
(new VwapField
)//partition by szCode.keyBy
(_._1
)//building Tumbling window
for 15 seconds.timeWindow
(Time.seconds
(15
))//compute VWAP
in window.apply
{ (k
: Long, w
: TimeWindow, T: Iterable
[(Long, Long, Long
)], out
: Collector
[(Long,String,String,Double
)]) =>var sumVolume
: Long
= 0var sumTurnover
: Long
= 0for
(elem
<- T
){sumVolume
= sumVolume + elem._2sumTurnover
= sumTurnover + elem._3
}val
format = new SimpleDateFormat
("yyyy-MM-dd HH:mm:ss.SSS")val vwap
: Double
= BigDecimal
(String.valueOf
(sumTurnover
))./
(BigDecimal
(String.valueOf
(sumVolume
))).setScale
(2,BigDecimal.RoundingMode.HALF_UP
).toDoubleout.collect
((k,format.format(w.getStart),format.format(w.getEnd),vwap))}.name
("VWAP per 15 seconds")/*** Step 4. Sink the final result to standard output
(.out file
)*/sumVolumePerMinute.print
()/*** Step 5. program execution*/env.execute
("SocketTextStream for sum of volume Example")}class VwapField extends MapFunction
[String,
(Long, Long, Long
)] {def map
(s: String
):
(Long, Long, Long
) = {val columns
= s.split
(",")val transaction
: Transaction
= Transaction
(columns
(0
),columns
(1
).toLong,columns
(2
),columns
(3
),columns
(4
).toLong,columns
(5
).toLong,columns
(6
).toLong,columns
(7
).toLong,columns
(8
).toLong,columns
(9
).toInt,columns
(9
),columns
(10
),columns
(11
).toLong,columns
(12
).toLong,columns
(13
).toLong
)val volume
: Long
= transaction.nVolumeval szCode
: Long
= transaction.szCodeval turnover
: Long
= transaction.nTurnover
(szCode, volume, turnover
)}}
}
七、總結
何時用Event Time,何時用Processing Time,這個要看具體的業務場景。
同時,對于Event Time中的Late Element,大家可以自己模擬輸入,看看結果如何。
自定義function與operator都應該是有狀態的,以便恢復,這里簡化,并沒有設置state。
參考文檔
1.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-101
2.https://www.oreilly.com/ideas/the-world-beyond-batch-streaming-102
3.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/index.html
4.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/event_time.html
5.https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming/windows.html
6.http://blog.madhukaraphatak.com/introduction-to-flink-streaming-part-9/
7.http://www.cnblogs.com/fxjwind/p/5434572.html
8.https://github.com/apache/flink/
9.http://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams/
10.http://data-artisans.com/blog/
11.http://dataartisans.github.io/flink-training/
總結
以上是生活随笔為你收集整理的Flink流计算编程--在WindowedStream中体会EventTime与ProcessingTime的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。