SparkRPC源码分析之RPC管道与消息类型
SparkRPC源碼分析之RPC管道與消息類型
我們前面看過了netty基礎知識掃盲,那我們應該明白,ChannelHandler這個組件內為channel的各種事件提供了處理邏輯,也就是主要業務邏輯寫在該組建內。Spark的RPC也不會例外,因此我們看一下Spark的Handler怎么調用的。在TransPortClientFactory初始化客戶端之前有一條代碼為TransportChannelHandler clientHandler = context.initializePipeline(ch);這里的context定義的地方為private final TransportContext context;也就時我們接下來看TransoprtContext類的方法,代碼如下
public TransportChannelHandler initializePipeline(SocketChannel channel) {
return initializePipeline(channel, rpcHandler);
}
1
2
3
可以看到這里的initializePipeline調用了另一個initializePipeline方法,它的代碼如下
public TransportChannelHandler initializePipeline(
SocketChannel channel,
RpcHandler channelRpcHandler) {
try {
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
channel.pipeline()
.addLast("encoder", ENCODER)
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
.addLast("decoder", DECODER)
.addLast("idleStateHandler", new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
// NOTE: Chunks are currently guaranteed to be returned in the order of request, but this
// would require more logic to guarantee if this were not part of the same event loop.
.addLast("handler", channelHandler);
return channelHandler;
} catch (RuntimeException e) {
logger.error("Error while initializing Netty pipeline", e);
throw e;
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
這里面和我們前面netty基礎知識掃盲里面做的內容很類似,就是給pipeline啟動添加了一些Handler處理邏輯
通過addLast添加的Handler會被依次執行順序或倒序,那么我們就來依次看一些他的Handler都干了什么。
addLast(“encoder”, ENCODER)
服務器端用來編碼服務器到客戶端響應的編碼器。通過調用消息的encode()方法對其進行編碼。對于非數據消息,將添加一個ByteBuf到“out”,其中包含總幀長度、消息類型和消息本身。在ChunkFetchSuccess的情況下,我們還將與數據對應的ManagedBuffer添加到“Out”,以便啟用零拷貝傳輸。一般會出現的消息類型如下
0 ChunkFetchRequest;
1 ChunkFetchSuccess;
2 ChunkFetchFailure;
3 RpcRequest;
4 RpcResponse;
5 RpcFailure;
6 StreamRequest;
7 StreamResponse;
8 StreamFailure;
9 OneWayMessage;
1 UploadStream;
-1 User
1
2
3
4
5
6
7
8
9
10
11
12
.addLast(TransportFrameDecoder.HANDLER_NAME, NettyUtils.createFrameDecoder())
- 一種允許截取原始數據的定制幀解碼器。
- 類似于Netty的幀解碼器(具有符合此庫需要的硬編碼參數)但是不同的是它在封裝成幀之前允許去裝攔截器直接去讀取數據
- 與Netty的幀解碼器不同,每個幀在解碼后立即被發送給子處理程序,而不是盡可能多地放入當前緩沖區一次性發出去。這允許子處理程序在需要時安裝攔截器。
- 如果安裝了攔截器,則停止封裝成幀,數據將直接輸入攔截器,當攔截器指示它不需要讀取任何更多數據時,封裝恢復
.addLast(“decoder”, DECODER)
客戶端用來解碼服務器到客戶端響應的解碼器。消息類型和加密端一樣不再重復寫了
.addLast(“idleStateHandler”, new IdleStateHandler(0, 0, conf.connectionTimeoutMs() / 1000))
在服務器和客戶端之間一定時間內沒有數據交互時, 即處于 idle【空閑】 狀態時, 客戶端或服務器會發送一個特殊的數據包給對方, 當接收方收到這個數據報文后, 也立即發送一個特殊的數據報文, 回應發送方, 此即一個 PING-PONG 交互,確保TCP連接有效
.addLast(“handler”, channelHandler);
channelHandler的創建代碼如下
TransportChannelHandler channelHandler = createChannelHandler(channel, channelRpcHandler);
1
createChannelHandler代碼如下
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred());
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), closeIdleConnections);
}
1
2
3
4
5
6
7
8
值得注意的一點,我們可以看到這里面有客戶端的初始化new TransportClient(channel, responseHandler);也許大家會有疑惑,我們前面才看了代碼TransportClientFactory中有初始化TransportClient的代碼,怎么這里也有呢?
這里分析一下TransportClientFactory中創建TransportClient時的情況,可以看到代碼如下
final AtomicReference<TransportClient> clientRef = new AtomicReference<>();
final AtomicReference<Channel> channelRef = new AtomicReference<>();
bootstrap.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
TransportChannelHandler clientHandler = context.initializePipeline(ch);
clientRef.set(clientHandler.getClient());
channelRef.set(ch);
}
});
…… 省略掉一部分代碼
TransportClient client = clientRef.get();
…… 省略掉一部分代碼
return client;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
可以看到這里的客戶端與其說時創建不如說是獲取,從clientHandler中獲取,這么看來,客戶端真正的創建的地方是在new關鍵字出現的地方,也就是這里,而TransportClientFactory中的創建不過是從這邊取到的而已。
接著看TransportChannelHandler這個類到底為何方神圣?
從類圖上可以看出來,這個類實現了ChannelInboundHandler接口,那么這個接口是干什么的呢?
ChannelInboundHandler是一個netty的組件,它是一個常用的Handler。這個Handler的作用就是處理接收到數據時的事件,我們的業務邏輯一般就是寫在這個Handler里面。
這個TransportChannelHandler的處理業務邏輯是什么呢?看下面代碼可知它重寫了channelRead方法
@Override
public void channelRead(ChannelHandlerContext ctx, Object request) throws Exception {
if (request instanceof RequestMessage) {
requestHandler.handle((RequestMessage) request);
} else if (request instanceof ResponseMessage) {
responseHandler.handle((ResponseMessage) request);
} else {
ctx.fireChannelRead(request);
}
}
1
2
3
4
5
6
7
8
9
10
這里它主要判斷請求是什么類型的數據,根據類型交給TransportResponseHandler或者TransportRequestHandler的對象去處理。
這里可以看出無論是TransportRequestHandler還是TransportResponseHandler都是繼承于MessageHandler抽象類。
那么我們就來看一下MessageHandler,看一下他的方法發現上面調用的handle方法都是來自于重寫該類的方法.
public abstract class MessageHandler<T extends Message> {
//處理單個消息的接收。
public abstract void handle(T message) throws Exception;
//當MessageHandler所在的通道處于活動狀態時調用
public abstract void channelActive();
//在通道上捕獲異常時調用
public abstract void exceptionCaught(Throwable cause);
//當MessageHandler所在的通道處于非活動狀態時調用
public abstract void channelInactive();
}
1
2
3
4
5
6
7
8
9
10
那么我們先看TransportRequestHandler重寫的handle方法
@Override
public void handle(RequestMessage request) {
if (request instanceof ChunkFetchRequest) {
processFetchRequest((ChunkFetchRequest) request);
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
可以看出,在這里用了if-else邏輯判斷消息的類型,然后再交給相應的方法去處理。那么一共有多少消息呢?它都可以處理什么消息呢?請看類圖
那么現在再看一下TransportResponseHandler它復寫的handle方法邏輯如下
@Override
public void handle(ResponseMessage message) throws Exception {
if (message instanceof ChunkFetchSuccess) {
ChunkFetchSuccess resp = (ChunkFetchSuccess) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel));
resp.body().release();
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onSuccess(resp.streamChunkId.chunkIndex, resp.body());
resp.body().release();
}
} else if (message instanceof ChunkFetchFailure) {
ChunkFetchFailure resp = (ChunkFetchFailure) message;
ChunkReceivedCallback listener = outstandingFetches.get(resp.streamChunkId);
if (listener == null) {
logger.warn("Ignoring response for block {} from {} ({}) since it is not outstanding",
resp.streamChunkId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingFetches.remove(resp.streamChunkId);
listener.onFailure(resp.streamChunkId.chunkIndex, new ChunkFetchFailureException(
"Failure while fetching " + resp.streamChunkId + ": " + resp.errorString));
}
} else if (message instanceof RpcResponse) {
RpcResponse resp = (RpcResponse) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({} bytes) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.body().size());
} else {
outstandingRpcs.remove(resp.requestId);
try {
listener.onSuccess(resp.body().nioByteBuffer());
} finally {
resp.body().release();
}
}
} else if (message instanceof RpcFailure) {
RpcFailure resp = (RpcFailure) message;
RpcResponseCallback listener = outstandingRpcs.get(resp.requestId);
if (listener == null) {
logger.warn("Ignoring response for RPC {} from {} ({}) since it is not outstanding",
resp.requestId, getRemoteAddress(channel), resp.errorString);
} else {
outstandingRpcs.remove(resp.requestId);
listener.onFailure(new RuntimeException(resp.errorString));
}
} else if (message instanceof StreamResponse) {
StreamResponse resp = (StreamResponse) message;
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry.getValue();
if (resp.byteCount > 0) {
StreamInterceptor<ResponseMessage> interceptor = new StreamInterceptor<>(
this, resp.streamId, resp.byteCount, callback);
try {
TransportFrameDecoder frameDecoder = (TransportFrameDecoder)
channel.pipeline().get(TransportFrameDecoder.HANDLER_NAME);
frameDecoder.setInterceptor(interceptor);
streamActive = true;
} catch (Exception e) {
logger.error("Error installing stream handler.", e);
deactivateStream();
}
} else {
try {
callback.onComplete(resp.streamId);
} catch (Exception e) {
logger.warn("Error in stream handler onComplete().", e);
}
}
} else {
logger.error("Could not find callback for StreamResponse.");
}
} else if (message instanceof StreamFailure) {
StreamFailure resp = (StreamFailure) message;
Pair<String, StreamCallback> entry = streamCallbacks.poll();
if (entry != null) {
StreamCallback callback = entry.getValue();
try {
callback.onFailure(resp.streamId, new RuntimeException(resp.error));
} catch (IOException ioe) {
logger.warn("Error in stream failure handler.", ioe);
}
} else {
logger.warn("Stream failure with unknown callback: {}", resp.error);
}
} else {
throw new IllegalStateException("Unknown response type: " + message.type());
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
詳細代碼暫且不管,大體可以看出也是使用if-else邏輯判斷消息的類型,然后分別進行處理。那么我么來看一些這里的消息。
---------------------?
轉載于:https://www.cnblogs.com/hyhy904/p/11007502.html
總結
以上是生活随笔為你收集整理的SparkRPC源码分析之RPC管道与消息类型的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 如何将Android的AOSP仓库放置到
- 下一篇: Flask框架 之abort、自定义错误