netty客户端源码
隨筆記錄。
?
//創建一個ChannelFactory(客戶端代碼)
ChannelFactory factory = new NioClientSocketChannelFactory(
????????????????? Executors.newCachedThreadPool(),
????????????????? Executors.newCachedThreadPool());
// NioClientSocketChannelFactory構造方法
public NioClientSocketChannelFactory(
??????????? Executor bossExecutor, Executor workerExecutor,
??????????? int bossCount, int workerCount) {
??????? ...
? ? ? ? // 線程池
??????? this.bossExecutor = bossExecutor;
// 線程池
??????? this.workerExecutor = workerExecutor;
? ? ? ?// 構建ChannelSink,NioClientSocketPipelineSink實例
? ? ? ?// bossCount默認1,workerCount默認Runtime.getRuntime().availableProcessors() * 2
??????? sink = new NioClientSocketPipelineSink(
??????????????? bossExecutor, workerExecutor, bossCount, workerCount);
}
?
// NioClientSocketPipelineSink構造方法
NioClientSocketPipelineSink(Executor bossExecutor, Executor workerExecutor,
??????????? int bossCount, int workerCount) {
??????? this.bossExecutor = bossExecutor;
? ? ??
??????? bosses = new Boss[bossCount];
??????? for (int i = 0; i < bosses.length; i ++) {
??????????? bosses[i] = new Boss(i + 1);
??????? }
???????
??????? workers = new NioWorker[workerCount];
??????? for (int i = 0; i < workers.length; i ++) {
??????????? workers[i] = new NioWorker(id, i + 1, workerExecutor);
??????? }
}
?
// 創建Bootstrap并設置factory(客戶端代碼)
ClientBootstrap bootstrap = new ClientBootstrap(factory);
// Bootstrap類set方法
public void setFactory(ChannelFactory factory) {
??????? …
??????? this.factory = factory;
}
?
// 設置ChannelPipelineFactory,實現getPipeline方法,返回一個ChannelPipeline實現類
// (客戶端代碼)
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
????????????? public ChannelPipeline getPipeline() {
????????????????? ChannelPipeline pipeline = Channels.pipeline();
????????????????? pipeline.addLast("encode",new StringEncoder());
????????????????? pipeline.addLast("decode",new StringDecoder());
????????????????? pipeline.addLast("handler1",new TimeClientHandler());
????????????????? return pipeline;
????????????? }
????????? });
DefaultChannelPipeline類addLast方法
public synchronized void addLast(String name, ChannelHandler handler) {
??? if (name2ctx.isEmpty()) {
??????? // 初始化name2ctx,head,tail
??????? init(name, handler);
??? } else {
??????? …
??????? DefaultChannelHandlerContext oldTail = tail;
??????? DefaultChannelHandlerContext??? newTail =?? new DefaultChannelHandlerContext(oldTail, null, name, handler);
??????? …
??????? // 最新的DefaultChannelHandlerContext放入tail以及更新到oldTail.next中
??????? oldTail.next = newTail;
??????? tail = newTail;
??????? name2ctx.put(name, newTail);
??????? …
??? }
}
// 客戶端發起連接請求(客戶端代碼)
bootstrap.connect (new InetSocketAddress("127.0.0.1", 8080));
// connect源代碼解讀
ClientBootstrap類connect方法
public ChannelFuture connect(final SocketAddress remoteAddress,
final SocketAddress localAddress) {
???????? …
??????? ChannelPipeline pipeline;
??????? try {
? ? ? ? ? ?// 返回 DefaultChannelPipeline對象實例
??????????? pipeline = getPipelineFactory().getPipeline();
??????? } catch (Exception e) {
??????????? throw new ChannelPipelineException("Failed to initialize a pipeline.", e);
??????? }
??????? // Set the options.
? ? ? ? // 返回NioClientSocketChannelFactory實例,并創建NioClientSocketChannel實例
??????? Channel ch = getFactory().newChannel(pipeline);
??????? ch.getConfig().setOptions(getOptions());
??????? // Bind.
??????? if (localAddress != null) {
??????????? ch.bind(localAddress);
??????? }
??????? // Connect.
??????? return ch.connect(remoteAddress);
}
NioClientSocketChannelFactory類newChannel方法
public SocketChannel newChannel(ChannelPipeline pipeline) {
????????//this為NioClientSocketChannelFactory實例
? ? ? ?//pipeline為DefaultChannelPipeline實例
? ? ? ?//sink為NioClientSocketPipelineSink實例
? ? ? ?// sink.nextWorker返回一個NioWorker實例
??????? return new NioClientSocketChannel(this, pipeline, sink, sink.nextWorker());
}
NioClientSocketChannel類構造方法
NioClientSocketChannel(
??????????? ChannelFactory factory, ChannelPipeline pipeline,
??????????? ChannelSink sink, NioWorker worker) {
? ? ? ? // ?新創建一個SocketChannel(newSocket() = > SocketChannel.open())
??????? super(null, factory, pipeline, sink, newSocket(), worker);
??????? fireChannelOpen(this);
??? }
繼續看父類NioSocketChannel構造方法
public NioSocketChannel(
??????????? Channel parent, ChannelFactory factory,
??????????? ChannelPipeline pipeline, ChannelSink sink,
??????????? SocketChannel socket, NioWorker worker) {
??????? super(parent, factory, pipeline, sink);
??????? this.socket = socket;
??????? this.worker = worker;
??????? config = new DefaultNioSocketChannelConfig(socket.socket());
}
繼續看父類AbstractChannel構造方法
protected AbstractChannel(
??????????? Channel parent, ChannelFactory factory,
??????????? ChannelPipeline pipeline, ChannelSink sink) {
?????????????????? // 傳入了一個null值
??????? this.parent = parent;
?????????????????? // NioClientSocketChannelFactory實例
??????? this.factory = factory;
?????????????????? // DefaultChannelPipeline實例
??????? this.pipeline = pipeline;
??????? id = allocateId(this);
??????? pipeline.attach(this, sink);
}
DefaultChannelPipeline類attach方法
public void attach(Channel channel, ChannelSink sink) {
??????? …
?????????????????? // NioClientSocketChannel實例
??????? this.channel = channel;
?????????????????? // NioClientSocketPipelineSink實例
??????? this.sink = sink;
}
?
// ClientBootstrap類connect方法中ch.connect(remoteAddress)
//類AbstractChannel
public ChannelFuture connect(SocketAddress remoteAddress) {
??????? return Channels.connect(this, remoteAddress);
}
//類Channels
public static ChannelFuture connect(Channel channel, SocketAddress remoteAddress) {
??????? if (remoteAddress == null) {
??????????? throw new NullPointerException("remoteAddress");
??????? }
??????? ChannelFuture future = future(channel, true);
?????????????????? // DefaultChannelPipeline
?????????????????? // 新建一個ChannelState實例DownstreamChannelStateEvent
??????? channel.getPipeline().sendDownstream(new DownstreamChannelStateEvent(
??????????????? channel, future, ChannelState.CONNECTED, remoteAddress));
??????? return future;
}
//類NioClientSocketPipelineSink
public void eventSunk(
??????????? ChannelPipeline pipeline, ChannelEvent e) throws Exception {
??????? if (e instanceof ChannelStateEvent) {
??????????? ChannelStateEvent event = (ChannelStateEvent) e;
??????????? NioClientSocketChannel channel =
??????????????? (NioClientSocketChannel) event.getChannel();
??????????? ChannelFuture future = event.getFuture();
??????????? ChannelState state = event.getState();
??????????? Object value = event.getValue();
?
??????????? switch (state) {
??????????? case OPEN:
??????????????? if (Boolean.FALSE.equals(value)) {
??????????????????? channel.worker.close(channel, future);
??????????????? }
??????????????? break;
??????????? case BOUND:
??????????????? if (value != null) {
??????????????????? bind(channel, future, (SocketAddress) value);
??????????????? } else {
??????????????????? channel.worker.close(channel, future);
??????????????? }
??????????????? break;
??????????? case CONNECTED:
??????????????? if (value != null) {
?????????????????????????????????????????????? //第一次客戶端發起連接
??????????????????? connect(channel, future, (SocketAddress) value);
??????????????? } else {
??????????????????? channel.worker.close(channel, future);
??????????????? }
??????????????? break;
??????????? case INTEREST_OPS:
??????????????? channel.worker.setInterestOps(channel, future, ((Integer) value).intValue());
??????????????? break;
??????????? }
??????? } else if (e instanceof MessageEvent) {
??????????? MessageEvent event = (MessageEvent) e;
??????????? NioSocketChannel channel = (NioSocketChannel) event.getChannel();
??????????? boolean offered = channel.writeBuffer.offer(event);
??????????? assert offered;
??????????? channel.worker.writeFromUserCode(channel);
??????? }
}
private void connect(
??????????? final NioClientSocketChannel channel, final ChannelFuture cf,
??????????? SocketAddress remoteAddress) {
??????? try {
// channel.socket在初始化NioClientSocketChannel時創建
//nio發起連接,因為設置了socket.configureBlocking(false)
//connect方法立即返回,返回值為false
//此時服務端已經收到了客戶端發送的connect事件并進行處理
??????????? if (channel.socket.connect(remoteAddress)) {
??????????????? channel.worker.register(channel, cf);
??????????? } else {
??????????????? channel.getCloseFuture().addListener(new ChannelFutureListener() {
??????????????????? public void operationComplete(ChannelFuture f)
??????????????????????????? throws Exception {
??????????????????????? if (!cf.isDone()) {
??????????????????????????? cf.setFailure(new ClosedChannelException());
??????????????????????? }
??????????????????? }
???? ???????????});
??????????????? cf.addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
??????????????? channel.connectFuture = cf;
???????????????????????????????????? //注冊事件,nextBoss()返回一個Runnable實例
??????????????? nextBoss().register(channel);
??????????? }
?
??????? } catch (Throwable t) {
??????????? cf.setFailure(t);
??????????? fireExceptionCaught(channel, t);
??????????? channel.worker.close(channel, succeededFuture(channel));
??????? }
}
// Boss內部類 NioClientSocketPipelineSink
void register(NioClientSocketChannel channel) {
??? Runnable registerTask = new RegisterTask(this, channel);
??? Selector selector;
?
??? synchronized (startStopLock) {
??????? if (!started) {
??????????? // Open a selector if this worker didn't start yet.
??????????? try {
??????????????? // 打開一個選擇器
????????? ??????this.selector = selector =? Selector.open();
??????????? } catch (Throwable t) {
??????????????? throw new ChannelException(
??????????????????????? "Failed to create a selector.", t);
??????????? }
?
??????????? // Start the worker thread with the new Selector.
??????????? boolean success = false;
??????????? try {
??????????????? //啟動線程,消費任務隊列
//bossExecutor是客戶端代碼Executors.newCachedThreadPool()所創建
// nio的selector.select(500)操作
??????????????? DeadLockProofWorker.start(
??????????????????????? bossExecutor,
??????????????????????? new ThreadRenamingRunnable(
??????????????????????????????? this, "New I/O client boss #" + id + '-' + subId));
??????????????? success = true;
??????????? } finally {
??????????????? if (!success) {
????????????????? ??// Release the Selector if the execution fails.
??????????????????? try {
??????????????????????? selector.close();
??????????????????? } catch (Throwable t) {
??????????????????????? logger.warn("Failed to close a selector.", t);
??????????????????? }
??????????????????? this.selector = selector = null;
??????????????????? // The method will return to the caller at this point.
??????????????? }
??????????? }
??????? } else {
??????????? // Use the existing selector if this worker has been started.
????? ??????selector = this.selector;
??????? }
?
??????? assert selector != null && selector.isOpen();
?
??????? started = true;
??????? //寫入隊列一個注冊任務
??????? boolean offered = registerTaskQueue.offer(registerTask);
??????? assert offered;
??? }
?
??? if (wakenUp.compareAndSet(false, true)) {
??????? selector.wakeup();
??? }
}
//類DeadLockProofWorker
public static void start(final Executor parent, final Runnable runnable) {
? //parent為bossExecutor,即一個線程池
? ? ? ? ......
//開啟一個子線程
??????? parent.execute(new Runnable() {
??????????? public void run() {
??????????????? PARENT.set(parent);
??????????????? try {
? ? ? ? ? ? ? ? ? ?// ThreadRenamingRunnable實例
??????????????????? runnable.run();
??????????????? } finally {
??????????????????? PARENT.remove();
??????????????? }
??????????? }
??????? });
}
//類ThreadRenamingRunnable
public void run() {
? ? ? ?......
??????? // Run the actual runnable and revert the name back when it ends.
??????? try {
? ? ? ? ? //runnable為Boss實例
??????????? runnable.run();
??????? } finally {
??????????? if (renamed) {
??????????????? // Revert the name back if the current thread was renamed.
??????????????? // We do not check the exception here because we know it works.
??????????????? currentThread.setName(oldThreadName);
??????????? }
??????? }
}
// Boss內部類 NioClientSocketPipelineSink中
public void run() {
??? boolean shutdown = false;
??? Selector selector = this.selector;
??? long lastConnectTimeoutCheckTimeNanos = System.nanoTime();
??? for (;;) {
??????? wakenUp.set(false);
?
??????? try {
? ? ? ? ? ? // 設置超時阻塞
??????????? int selectedKeyCount = selector.select(500);
?
??????????? if (wakenUp.get()) {
??????????????? selector.wakeup();
??????????? }
? ? ? ? ? ? // 消費隊列中的事件
? ? ? ? ? ? //nio中register操作
???????? ???processRegisterTaskQueue();
?
??????????? if (selectedKeyCount > 0) {
? ? ? ? ? ? ? ? //處理選擇器獲取到的事件
??????????????? processSelectedKeys(selector.selectedKeys());
??????????? }
??????????? ……
??????? } catch (Throwable t) {
?????????? ……
??????? }
??? }
}
?
private void processRegisterTaskQueue() {
for (;;) {
???????? //獲取事件,task為registerTaskQueue.offer(registerTask);RegisterTask實例
??????? final Runnable task = registerTaskQueue.poll();
??????? if (task == null) {
??????????? break;
??????? }
? ? ? ?//執行NioClientSocketPipelineSink中的內部類RegisterTask的Run方法
??????? task.run();
??? }
}
?
//內部類RegisterTask NioClientSocketPipelineSink中
public void run() {
try {
???????? // nio socket注冊,只有完成注冊以后,才能和服務端進行通信
??????? channel.socket.register(
??????????????? boss.selector, SelectionKey.OP_CONNECT, channel);
??? } catch (ClosedChannelException e) {
??????? channel.worker.close(channel, succeededFuture(channel));
??? }
?? ……
}
private void processSelectedKeys(Set<SelectionKey> selectedKeys) {
??? for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
??????? SelectionKey k = i.next();
??????? i.remove();
?
??????? if (!k.isValid()) {
??????????? close(k);
??????????? continue;
??????? }
?
??????? if (k.isConnectable()) {
? ? ? ? ? ? //完成客戶端連接
??????????? connect(k);
??????? }
??? }
}
?
private void connect(SelectionKey k) {
??? NioClientSocketChannel ch = (NioClientSocketChannel) k.attachment();
try {
???????? //nio完成客戶端連接
??????? if (ch.socket.finishConnect()) {
??????????? k.cancel();
? ? ? ? ? ? //NioWorker類注冊
??????????? ch.worker.register(ch, ch.connectFuture);
??????? }
??? } catch (Throwable t) {
? ? ? ?.......
??? }
}
?
類NioWorker負責讀寫事件注冊處理
未完待續...
轉載于:https://www.cnblogs.com/liuxinan/p/6073424.html
創作挑戰賽新人創作獎勵來咯,堅持創作打卡瓜分現金大獎總結
以上是生活随笔為你收集整理的netty客户端源码的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: [Mac]Python 安装MySQLd
- 下一篇: extJs项目实战