聊聊flink的ConnectionManager
序
本文主要研究一下flink的ConnectionManager
ConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ConnectionManager.java
public interface ConnectionManager {void start(ResultPartitionProvider partitionProvider,TaskEventDispatcher taskEventDispatcher) throws IOException;/*** Creates a {@link PartitionRequestClient} instance for the given {@link ConnectionID}.*/PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException;/*** Closes opened ChannelConnections in case of a resource release.*/void closeOpenChannelConnections(ConnectionID connectionId);int getNumberOfActiveConnections();int getDataPort();void shutdown() throws IOException;}- ConnectionManager定義了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有兩個子類,一個是LocalConnectionManager,一個是NettyConnectionManager
LocalConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/LocalConnectionManager.java
public class LocalConnectionManager implements ConnectionManager {@Overridepublic void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) {}@Overridepublic PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) {return null;}@Overridepublic void closeOpenChannelConnections(ConnectionID connectionId) {}@Overridepublic int getNumberOfActiveConnections() {return 0;}@Overridepublic int getDataPort() {return -1;}@Overridepublic void shutdown() {} }- LocalConnectionManager實現了ConnectionManager接口,不過它的實現基本是空操作
NettyConnectionManager
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
public class NettyConnectionManager implements ConnectionManager {private final NettyServer server;private final NettyClient client;private final NettyBufferPool bufferPool;private final PartitionRequestClientFactory partitionRequestClientFactory;public NettyConnectionManager(NettyConfig nettyConfig) {this.server = new NettyServer(nettyConfig);this.client = new NettyClient(nettyConfig);this.bufferPool = new NettyBufferPool(nettyConfig.getNumberOfArenas());this.partitionRequestClientFactory = new PartitionRequestClientFactory(client);}@Overridepublic void start(ResultPartitionProvider partitionProvider, TaskEventDispatcher taskEventDispatcher) throws IOException {NettyProtocol partitionRequestProtocol = new NettyProtocol(partitionProvider,taskEventDispatcher,client.getConfig().isCreditBasedEnabled());client.init(partitionRequestProtocol, bufferPool);server.init(partitionRequestProtocol, bufferPool);}@Overridepublic PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId)throws IOException, InterruptedException {return partitionRequestClientFactory.createPartitionRequestClient(connectionId);}@Overridepublic void closeOpenChannelConnections(ConnectionID connectionId) {partitionRequestClientFactory.closeOpenChannelConnections(connectionId);}@Overridepublic int getNumberOfActiveConnections() {return partitionRequestClientFactory.getNumberOfActiveClients();}@Overridepublic int getDataPort() {if (server != null && server.getLocalAddress() != null) {return server.getLocalAddress().getPort();} else {return -1;}}@Overridepublic void shutdown() {client.shutdown();server.shutdown();}NettyClient getClient() {return client;}NettyServer getServer() {return server;}NettyBufferPool getBufferPool() {return bufferPool;} }- NettyConnectionManager實現了ConnectionManager接口;它的構造器使用NettyConfig創建了NettyServer、NettyClient、NettyBufferPool,同時使用NettyClient創建了PartitionRequestClientFactory
- start方法創建了NettyProtocol,同時初始化NettyClient、NettyServer;shutdown方法則關閉NettyClient、NettyServer;closeOpenChannelConnections則是使用partitionRequestClientFactory.closeOpenChannelConnections來關閉指定的connectionId
- createPartitionRequestClient方法通過partitionRequestClientFactory.createPartitionRequestClient來創建PartitionRequestClient
PartitionRequestClientFactory
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
class PartitionRequestClientFactory {private final NettyClient nettyClient;private final ConcurrentMap<ConnectionID, Object> clients = new ConcurrentHashMap<ConnectionID, Object>();PartitionRequestClientFactory(NettyClient nettyClient) {this.nettyClient = nettyClient;}/*** Atomically establishes a TCP connection to the given remote address and* creates a {@link PartitionRequestClient} instance for this connection.*/PartitionRequestClient createPartitionRequestClient(ConnectionID connectionId) throws IOException, InterruptedException {Object entry;PartitionRequestClient client = null;while (client == null) {entry = clients.get(connectionId);if (entry != null) {// Existing channel or connecting channelif (entry instanceof PartitionRequestClient) {client = (PartitionRequestClient) entry;}else {ConnectingChannel future = (ConnectingChannel) entry;client = future.waitForChannel();clients.replace(connectionId, future, client);}}else {// No channel yet. Create one, but watch out for a race.// We create a "connecting future" and atomically add it to the map.// Only the thread that really added it establishes the channel.// The others need to wait on that original establisher's future.ConnectingChannel connectingChannel = new ConnectingChannel(connectionId, this);Object old = clients.putIfAbsent(connectionId, connectingChannel);if (old == null) {nettyClient.connect(connectionId.getAddress()).addListener(connectingChannel);client = connectingChannel.waitForChannel();clients.replace(connectionId, connectingChannel, client);}else if (old instanceof ConnectingChannel) {client = ((ConnectingChannel) old).waitForChannel();clients.replace(connectionId, old, client);}else {client = (PartitionRequestClient) old;}}// Make sure to increment the reference count before handing a client// out to ensure correct bookkeeping for channel closing.if (!client.incrementReferenceCounter()) {destroyPartitionRequestClient(connectionId, client);client = null;}}return client;}public void closeOpenChannelConnections(ConnectionID connectionId) {Object entry = clients.get(connectionId);if (entry instanceof ConnectingChannel) {ConnectingChannel channel = (ConnectingChannel) entry;if (channel.dispose()) {clients.remove(connectionId, channel);}}}int getNumberOfActiveClients() {return clients.size();}/*** Removes the client for the given {@link ConnectionID}.*/void destroyPartitionRequestClient(ConnectionID connectionId, PartitionRequestClient client) {clients.remove(connectionId, client);}//...... }- PartitionRequestClientFactory的構造器需要一個NettyClient;它使用ConcurrentHashMap在內存維護了一個ConnectionID與PartitionRequestClient或ConnectingChannel的映射關系
- createPartitionRequestClient方法會先從ConcurrentHashMap查找是否有對應ConnectionID的PartitionRequestClient或ConnectingChannel,如果存在且是PartitionRequestClient實例則返回,如果存在且是ConnectingChannel實例則調用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替換對應ConnectionID在ConcurrentHashMap的值為PartitionRequestClient;如果ConcurrentHashMap沒有對應ConnectionID的值,則會創建一個ConnectingChannel,然后放入到ConcurrentHashMap中,同時獲取old object,如果old為null,則使用nettyClient.connect進行連接,然后獲取PartitionRequestClient,之后替換ConcurrentHashMap中的值;如果old是ConnectingChannel則調用ConnectingChannel.waitForChannel等待PartitionRequestClient,然后替換ConcurrentHashMap中的值;在返回PartitionRequestClient之前會通過client.incrementReferenceCounter()來遞增引用,如果遞增不成功則調用destroyPartitionRequestClient,返回null,遞增成功則返回PartitionRequestClient
- closeOpenChannelConnections方法則判斷,如果是ConnectingChannel,則調用ConnectingChannel.dispose,成功之后從ConcurrentHashMap中移除
ConnectingChannel
flink-release-1.7.2/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
private static final class ConnectingChannel implements ChannelFutureListener {private final Object connectLock = new Object();private final ConnectionID connectionId;private final PartitionRequestClientFactory clientFactory;private boolean disposeRequestClient = false;public ConnectingChannel(ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {this.connectionId = connectionId;this.clientFactory = clientFactory;}private boolean dispose() {boolean result;synchronized (connectLock) {if (partitionRequestClient != null) {result = partitionRequestClient.disposeIfNotUsed();}else {disposeRequestClient = true;result = true;}connectLock.notifyAll();}return result;}private void handInChannel(Channel channel) {synchronized (connectLock) {try {NetworkClientHandler clientHandler = channel.pipeline().get(NetworkClientHandler.class);partitionRequestClient = new PartitionRequestClient(channel, clientHandler, connectionId, clientFactory);if (disposeRequestClient) {partitionRequestClient.disposeIfNotUsed();}connectLock.notifyAll();}catch (Throwable t) {notifyOfError(t);}}}private volatile PartitionRequestClient partitionRequestClient;private volatile Throwable error;private PartitionRequestClient waitForChannel() throws IOException, InterruptedException {synchronized (connectLock) {while (error == null && partitionRequestClient == null) {connectLock.wait(2000);}}if (error != null) {throw new IOException("Connecting the channel failed: " + error.getMessage(), error);}return partitionRequestClient;}private void notifyOfError(Throwable error) {synchronized (connectLock) {this.error = error;connectLock.notifyAll();}}@Overridepublic void operationComplete(ChannelFuture future) throws Exception {if (future.isSuccess()) {handInChannel(future.channel());}else if (future.cause() != null) {notifyOfError(new RemoteTransportException("Connecting to remote task manager + '" + connectionId.getAddress() +"' has failed. This might indicate that the remote task " +"manager has been lost.",connectionId.getAddress(), future.cause()));}else {notifyOfError(new LocalTransportException(String.format("Connecting to remote task manager '%s' has been cancelled.",connectionId.getAddress()),null));}}}- ConnectingChannel實現了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的時候會調用handInChannel方法,該方法會創建PartitionRequestClient;waitForChannel方法則會等待partitionRequestClient創建成功然后返回
小結
- ConnectionManager定義了start、shutdown、closeOpenChannelConnections等方法用于管理physical connections;它有兩個子類,一個是LocalConnectionManager,一個是NettyConnectionManager
- LocalConnectionManager實現了ConnectionManager接口,不過它的實現基本是空操作;NettyConnectionManager實現了ConnectionManager接口,它的構造器使用NettyConfig創建了NettyServer、NettyClient、NettyBufferPool,同時使用NettyClient創建了PartitionRequestClientFactory,start方法創建了NettyProtocol,同時初始化NettyClient、NettyServer,shutdown方法則關閉NettyClient、NettyServer,closeOpenChannelConnections則是使用partitionRequestClientFactory.closeOpenChannelConnections來關閉指定的connectionId,createPartitionRequestClient方法通過partitionRequestClientFactory.createPartitionRequestClient來創建PartitionRequestClient
- PartitionRequestClientFactory的構造器需要一個NettyClient;它使用ConcurrentHashMap在內存維護了一個ConnectionID與PartitionRequestClient或ConnectingChannel的映射關系;ConnectingChannel實現了netty的ChannelFutureListener接口,它的operationComplete方法在ChannelFuture是success的時候會調用handInChannel方法,該方法會創建PartitionRequestClient;waitForChannel方法則會等待partitionRequestClient創建成功然后返回
doc
- ConnectionManager
總結
以上是生活随笔為你收集整理的聊聊flink的ConnectionManager的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 闭包
- 下一篇: 【数道云大数据】大数据平台哪一个好用?武