@Override protectedvoiddoRegister()throws Exception { boolean selected = false; for (;;) { try { //调用ServerSocketChannel的register方法,把当前服务端对象注册到boss线程的selector上 selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } } }
privatevoidexecute(Runnable task, boolean immediate){ boolean inEventLoop = inEventLoop(); addTask(task); if (!inEventLoop) { startThread(); //启动线程 if (isShutdown()) { boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
protectedvoidrun(){ int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT:
case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } catch (IOException e) { // If we receive an IOException here its because the Selector is messed up. Let's rebuild // the selector and retry. https://github.com/netty/netty/issues/8566 rebuildSelector0(); selectCnt = 0; handleLoopException(e); continue; }
selectCnt++; cancelledKeys = 0; needsToSelectAgain = false; finalint ioRatio = this.ioRatio; boolean ranTasks; if (ioRatio == 100) { try { if (strategy > 0) { processSelectedKeys(); } } finally { // Ensure we always run tasks. ranTasks = runAllTasks(); } } elseif (strategy > 0) { finallong ioStartTime = System.nanoTime(); try { processSelectedKeys(); } finally { // Ensure we always run tasks. finallong ioTime = System.nanoTime() - ioStartTime; ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } } else { ranTasks = runAllTasks(0); // This will run the minimum number of tasks }
if (ranTasks || strategy > 0) { if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } catch (CancelledKeyException e) { // Harmless exception - log anyway if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?", selector, e); } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } finally { // Always handle shutdown even if the loop processing threw an exception. try { if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { return; } } } catch (Error e) { throw (Error) e; } catch (Throwable t) { handleLoopException(t); } } } }
protectedvoidrun(){ int selectCnt = 0; for (;;) { try { int strategy; try { strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks()); switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: } } //省略.... } } }
switch (strategy) { case SelectStrategy.CONTINUE: continue;
case SelectStrategy.BUSY_WAIT: // fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT: long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); } // fall through default: }
SelectStrategy.SELECT
当NioEventLoop线程中不存在异步任务时,则开始执行SELECT策略
//下一次定时任务触发截至时间,默认不是定时任务,返回 -1L long curDeadlineNanos = nextScheduledTaskDeadlineNanos(); if (curDeadlineNanos == -1L) { curDeadlineNanos = NONE; // nothing on the calendar } nextWakeupNanos.set(curDeadlineNanos); try { if (!hasTasks()) { //2. taskQueue中任务执行完,开始执行select进行阻塞 strategy = select(curDeadlineNanos); } } finally { // This update is just to help block unnecessary selector wakeups // so use of lazySet is ok (no race condition) nextWakeupNanos.lazySet(AWAKE); }
privatevoidprocessSelectedKeysOptimized(){ for (int i = 0; i < selectedKeys.size; ++i) { //1. 取出IO事件以及对应的channel final SelectionKey k = selectedKeys.keys[i]; selectedKeys.keys[i] = null;//k的引用置null,便于gc回收,也表示该channel的事件处理完成避免重复处理
final Object a = k.attachment(); //获取保存在当前channel中的attachment,此时应该是NioServerSocketChannel //处理当前的channel if (a instanceof AbstractNioChannel) { //对于boss NioEventLoop,轮询到的基本是连接事件,后续的事情就是通过他的pipeline将连接扔给一个worker NioEventLoop处理 //对于worker NioEventLoop来说,轮循道的基本商是IO读写事件,后续的事情就是通过他的pipeline将读取到的字节流传递给每个channelHandler来处理 processSelectedKey(k, (AbstractNioChannel) a); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; processSelectedKey(k, task); } if (needsToSelectAgain) { // null out entries in the array to allow to have it GC'ed once the Channel close // See https://github.com/netty/netty/issues/2363 selectedKeys.reset(i + 1);
selectAgain(); i = -1; } } }
processSelectedKey
privatevoidprocessSelectedKey(SelectionKey k, AbstractNioChannel ch){ final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe(); if (!k.isValid()) { final EventLoop eventLoop; try { eventLoop = ch.eventLoop(); } catch (Throwable ignored) { } if (eventLoop == this) { // close the channel if the key is not valid anymore unsafe.close(unsafe.voidPromise()); } return; }
try { int readyOps = k.readyOps(); //获取当前key所属的操作类型 if ((readyOps & SelectionKey.OP_CONNECT) != 0) {//如果是连接类型 int ops = k.interestOps(); ops &= ~SelectionKey.OP_CONNECT; k.interestOps(ops);
int size = readBuf.size(); for (int i = 0; i < size; i ++) { readPending = false; pipeline.fireChannelRead(readBuf.get(i)); //调用pipeline中的channelRead方法 } readBuf.clear(); allocHandle.readComplete(); pipeline.fireChannelReadComplete();
if (exception != null) { closed = closeOnReadError(exception);
privatebooleanfetchFromScheduledTaskQueue(){ if (scheduledTaskQueue == null || scheduledTaskQueue.isEmpty()) { returntrue; } long nanoTime = AbstractScheduledEventExecutor.nanoTime(); for (;;) { Runnable scheduledTask = pollScheduledTask(nanoTime); if (scheduledTask == null) { returntrue; } if (!taskQueue.offer(scheduledTask)) { // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again. scheduledTaskQueue.add((ScheduledFutureTask<?>) scheduledTask); returnfalse; } } }
privatevoidexecute(Runnable task, boolean immediate){ boolean inEventLoop = inEventLoop(); addTask(task); //把当前任务添加到阻塞队列中 if (!inEventLoop) { //如果是非NioEventLoop startThread(); //启动线程 if (isShutdown()) { //如果当前NioEventLoop已经是停止状态 boolean reject = false; try { if (removeTask(task)) { reject = true; } } catch (UnsupportedOperationException e) { // The task queue does not support removal so the best thing we can do is to just move on and // hope we will be able to pick-up the task before its completely terminated. // In worst case we will log on termination. } if (reject) { reject(); } } }
if (!addTaskWakesUp && immediate) { wakeup(inEventLoop); } }
@Override protectedvoidrun(){ int selectCnt = 0; for (;;) { //selectCnt记录的是无功而返的select次数,即eventLoop空转的次数,为解决NIO BUG selectCnt++; //ranTasks=true,或strategy>0,说明eventLoop干活了,没有空转,清空selectCnt if (ranTasks || strategy > 0) { //如果选择操作计数器的值,大于最小选择器重构阈值,则输出log if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.", selectCnt - 1, selector); } selectCnt = 0; } //unexpectedSelectorWakeup处理NIO BUG elseif (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case) selectCnt = 0; } } }
unexpectedSelectorWakeup
privatebooleanunexpectedSelectorWakeup(int selectCnt){ if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop."); } returntrue; } //如果选择重构的阈值大于0, 默认值是512次、 并且当前触发的空轮询次数大于 512次。,则触发重构 if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { // The selector returned prematurely many times in a row. // Rebuild the selector to work around the problem. logger.warn("Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.", selectCnt, selector); rebuildSelector(); returntrue; } returnfalse; }
privatevoidrebuildSelector0(){ final Selector oldSelector = selector; //获取老的selector选择器 final SelectorTuple newSelectorTuple; //定义新的选择器
if (oldSelector == null) { //如果老的选择器为空,直接返回 return; }
try { newSelectorTuple = openSelector(); //创建一个新的选择器 } catch (Exception e) { logger.warn("Failed to create a new Selector.", e); return; }
// Register all channels to the new Selector. int nChannels = 0; for (SelectionKey key: oldSelector.keys()) {//遍历注册到选择器的选择key集合 Object a = key.attachment(); try { //如果选择key无效或选择关联的通道已经注册到新的选择器,则跳出当前循环 if (!key.isValid() || key.channel().keyFor(newSelectorTuple.unwrappedSelector) != null) { continue; } //获取key的选择关注事件集 int interestOps = key.interestOps(); key.cancel();//取消选择key //注册选择key到新的选择器 SelectionKey newKey = key.channel().register(newSelectorTuple.unwrappedSelector, interestOps, a); if (a instanceof AbstractNioChannel) {//如果是nio通道,则更新通道的选择key // Update SelectionKey ((AbstractNioChannel) a).selectionKey = newKey; } nChannels ++; } catch (Exception e) { logger.warn("Failed to re-register a Channel to the new Selector.", e); if (a instanceof AbstractNioChannel) { AbstractNioChannel ch = (AbstractNioChannel) a; ch.unsafe().close(ch.unsafe().voidPromise()); } else { @SuppressWarnings("unchecked") NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a; invokeChannelUnregistered(task, key, e); } } } //更新当前事件循环选择器 selector = newSelectorTuple.selector; unwrappedSelector = newSelectorTuple.unwrappedSelector;
try { // time to close the old selector as everything else is registered to the new one oldSelector.close(); //关闭原始选择器 } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("Failed to close the old Selector.", t); } }
if (logger.isInfoEnabled()) { logger.info("Migrated " + nChannels + " channel(s) to the new Selector."); } }
try { if (ch != null) { buf.add(new NioSocketChannel(this, ch)); //这里 return1; } } catch (Throwable t) { logger.warn("Failed to create a new channel from an accepted socket.", t);
try { ch.close(); } catch (Throwable t2) { logger.warn("Failed to close a socket.", t2); } }
privatevoidregister0(ChannelPromise promise){ try { // check if the channel is still open as it could be closed in the mean time when the register // call was outside of the eventLoop if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; doRegister(); neverRegistered = false; registered = true; // pipeline.invokeHandlerAddedIfNeeded();
@Override publicvoidhandlerAdded(ChannelHandlerContext ctx)throws Exception { if (ctx.channel().isRegistered()) { // This should always be true with our current DefaultChannelPipeline implementation. // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers // will be added in the expected order. if (initChannel(ctx)) {
// We are done with init the Channel, removing the initializer now. removeState(ctx); } } }
接着,调用initChannel抽象方法,该方法由具体的实现类来完成。
privatebooleaninitChannel(ChannelHandlerContext ctx)throws Exception { if (initMap.add(ctx)) { // Guard against re-entrance. try { initChannel((C) ctx.channel()); } catch (Throwable cause) { // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...). // We do so to prevent multiple calls to initChannel(...). exceptionCaught(ctx, cause); } finally { ChannelPipeline pipeline = ctx.pipeline(); if (pipeline.context(this) != null) { pipeline.remove(this); } } returntrue; } returnfalse; }