Flume的Avro Sink和Avro Source研究之一: Avro Source
問(wèn)題 : Avro Source提供了怎么樣RPC服務(wù),是怎么提供的?
問(wèn)題 1.1 Flume Source是如何啟動(dòng)一個(gè)Netty Server來(lái)提供RPC服務(wù)。
由GitHub上avro-rpc-quickstart知道可以通過(guò)下面這種方式啟動(dòng)一個(gè)NettyServer,來(lái)提供特定的RPC。那么Flume Source 是通過(guò)這種方法來(lái)提供的RPC服務(wù)嗎?
server = new NettyServer(new SpecificResponder(Mail.class, new MailImpl()), new InetSocketAddress(65111));?
?AvroSource中創(chuàng)建NettyServer的源碼為:
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);NioServerSocketChannelFactory socketChannelFactory = initSocketChannelFactory();ChannelPipelineFactory pipelineFactory = initChannelPipelineFactory();server = new NettyServer(responder, new InetSocketAddress(bindAddress, port),socketChannelFactory, pipelineFactory, null);
看來(lái)AvroSource也是直接用Avro提供的NettyServer類來(lái)建立了一個(gè)NettyServe,不過(guò)它使用了另一個(gè)構(gòu)造函數(shù),指定了ChannelFactory和ChannelPipelineFactory.
? 那么AvroSource使用的是怎么樣的一個(gè)ChannelFactory呢?
initSocketChannelFactory()方法的實(shí)現(xiàn)為:
private NioServerSocketChannelFactory initSocketChannelFactory() {NioServerSocketChannelFactory socketChannelFactory;if (maxThreads <= 0) {socketChannelFactory = new NioServerSocketChannelFactory(Executors .newCachedThreadPool(), Executors.newCachedThreadPool());} else {socketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newFixedThreadPool(maxThreads));}return socketChannelFactory;}看來(lái)之所以要指定ChannelFactory,是為了根據(jù)AvroSource的"threads”這個(gè)參數(shù),來(lái)決定可以使用worker thread的最大個(gè)數(shù)。這個(gè)數(shù)字決定了最多有多少個(gè)線程來(lái)處理RPC請(qǐng)求。
參見NioServerChannelFactory的說(shuō)明
A ServerSocketChannelFactory which creates a server-side NIO-based ServerSocketChannel. It utilizes the non-blocking I/O mode which was introduced with NIO to serve many number of concurrent connections efficiently.How threads workThere are two types of threads in a NioServerSocketChannelFactory; one is boss thread and the other is worker thread.Boss threadsEach bound ServerSocketChannel has its own boss thread. For example, if you opened two server ports such as 80 and 443, you will have two boss threads. A boss thread accepts incoming connections until the port is unbound. Once a connection is accepted successfully, the boss thread passes the accepted Channel to one of the worker threads that the NioServerSocketChannelFactory manages.Worker threadsOne NioServerSocketChannelFactory can have one or more worker threads. A worker thread performs non-blocking read and write for one or more Channels in a non-blocking mode.
ChannelPipelineFactory是干嘛的呢?為什么也要特化一個(gè)?
ChannelPipleline類的說(shuō)明為:
A list of?ChannelHandlers which handles or intercepts?ChannelEvents of a?Channel.?ChannelPipeline?implements an advanced form of the?Intercepting Filter?pattern to give a user full control over how an event is handled and how the?ChannelHandlers in the pipeline interact with each other.
?
看來(lái)這東西提供了一種更高級(jí)的攔截器組合。那就來(lái)看看AvroSource是用了怎么樣的ChannelPiplelineFactory
private ChannelPipelineFactory initChannelPipelineFactory() {ChannelPipelineFactory pipelineFactory;boolean enableCompression = compressionType.equalsIgnoreCase("deflate");if (enableCompression || enableSsl) {pipelineFactory = new SSLCompressionChannelPipelineFactory(enableCompression, enableSsl, keystore,keystorePassword, keystoreType);} else {pipelineFactory = new ChannelPipelineFactory() {@Overridepublic ChannelPipeline getPipeline() throws Exception {return Channels.pipeline();}};}return pipelineFactory;}
看來(lái)如果開啟了壓縮或者使用了ssl,就使用SSLCompressionChannelPiplelineFactory,這類是AvroSource一個(gè)私有的靜態(tài)內(nèi)部類。否則就使用Channels.pipleline()新建一個(gè),這個(gè)pipleline貌似啥都不做?
問(wèn)題 1.2這樣Server是起來(lái)了,可是Server提供了什么樣的RPC服務(wù)呢?
關(guān)鍵在這一句。
Responder responder = new SpecificResponder(AvroSourceProtocol.class, this);
查下Avro的API,得知道SpecificResponder的兩個(gè)參數(shù)是protocol和protocol的實(shí)現(xiàn)。看起來(lái)AvroSource這個(gè)類實(shí)現(xiàn)了AvroSourceProtocol。Yes, AvroSource的聲明為
public class AvroSource extends AbstractSource implements EventDrivenSource,Configurable, AvroSourceProtocol
那就看看AvroSourceProtocol是怎么樣定義的吧。它定義在flume-ng-sdk工程的src/main/avro目錄下,由flume.avdl定義。avdl是使用Avro IDL定義的協(xié)議。放在那個(gè)特定的目錄下,是avro-maven-plugin的約定。
這個(gè)avdl是這樣的
@namespace("org.apache.flume.source.avro")
protocol AvroSourceProtocol {
enum Status {
OK, FAILED, UNKNOWN
}
record AvroFlumeEvent {
map<string> headers;
bytes body;
}
Status append( AvroFlumeEvent event );
Status appendBatch( array<AvroFlumeEvent> events );
}
它定義了一個(gè)枚舉,用作append和appendBatch的返回值。表示Source端對(duì)傳輸來(lái)的消息處理的結(jié)果,有OK FAILED UNKNOWN三種狀態(tài)。
定義了 AvroFlumeEvent這樣一個(gè)record類型,符合Flume對(duì)Event的定義,header是一系列K-V對(duì),即一個(gè)Map, body是byte數(shù)組。
定義了兩個(gè)方法,append單條AvroFlumeEvent,以及append一批AvroFlumeEvent.
由此avdl,Avro生成了三個(gè)java文件,包括:一個(gè)枚舉Status,一個(gè)類AvroFlumeEvent,一個(gè)接口AvroSourceProtocol。其中AvroSource類實(shí)現(xiàn)了AvroSourceProtocol接口,對(duì)外提供了append和appendBatch這兩個(gè)遠(yuǎn)程方法調(diào)用。
append方法實(shí)現(xiàn)為:
@Overridepublic Status append(AvroFlumeEvent avroEvent) {logger.debug("Avro source {}: Received avro event: {}", getName(),avroEvent);sourceCounter.incrementAppendReceivedCount();sourceCounter.incrementEventReceivedCount();Event event = EventBuilder.withBody(avroEvent.getBody().array(),toStringMap(avroEvent.getHeaders()));try {getChannelProcessor().processEvent(event);} catch (ChannelException ex) {logger.warn("Avro source " + getName() + ": Unable to process event. " +"Exception follows.", ex);return Status.FAILED;}sourceCounter.incrementAppendAcceptedCount();sourceCounter.incrementEventAcceptedCount();return Status.OK;}
這個(gè)方法就是用獲取的AvroFlumeEvent對(duì)象,經(jīng)過(guò)轉(zhuǎn)換構(gòu)建一個(gè)Event對(duì)象。這個(gè)轉(zhuǎn)換只是將不對(duì)等的數(shù)據(jù)類型進(jìn)行了轉(zhuǎn)換,arvoEvent.getBody()返回的是ByteBuffer,而avroEvent.getHeaders()返回的是Map<CharSequence,CharSequence>。
構(gòu)建完Event后,把這個(gè)消息傳遞給這個(gè)Source對(duì)應(yīng)的ChannelProcessor來(lái)處理。
appendBatch方法和append方法的實(shí)現(xiàn)很相似。
?
?
轉(zhuǎn)載于:https://www.cnblogs.com/devos/p/3617764.html
總結(jié)
以上是生活随笔為你收集整理的Flume的Avro Sink和Avro Source研究之一: Avro Source的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 关于曲终人散的唯美句子大全
- 下一篇: 网站SEO文章编辑的事项