10年工作经验程序猿一文讲懂netty的责任链设计模式Ne

Netty责任链设计模式

设计模式

责任链模式为请求创建了一个处理对象的链

发起请求和具体处理请求的过程进行解耦:职责链上的处理者负责处理请求,客户只需要将请求发送到职责链上,无需关心请求的处理细节和请求的传递.

实现责任链模式

实现责任链模式有4个要素:

  1. 处理器抽象类
  2. 具体的处理器实现类
  3. 保存处理器信息
  4. 处理执行

集合形式存储 --- 伪代码示例

//创建处理器抽象类
class AbstractHandler{
    void doHandler(Object args); 
}

// 处理器具体实现类
class Handler1 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}
class Handler2 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}
class Handler3 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}

//创建集合并存储所有处理器实例信息
List handlers = new ArrayList<AbstractHandler>()
handlers.add(new Handler1())
handlers.add(new Handler2())
handlers.add(new Handler3())

//调用处理器处理请求
void process(request) {
    for(AbstractHandler handler:handlers) {
        handler.doHandler(request);
    }
}

//发起请求,通过责任链处理请求
call.process(request)
复制代码

tomcat就是使用的这种责任链模式

链表形式调用 --- 伪代码示例

//处理器抽象类
class AbstractHandler {
    AbstractHandler next;
    void doHandler(Object args);
}

//处理器具体实现类
class Handler1 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}
class Handler2 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}
class Handler3 extends AbstractHandler {
    void doHandler(Object args){
        //handler
    }
}

//将处理器串成链表存储
pipeline = start [new Handler1() -> new Handler2() -> new Handler3()] end

//处理请求,从头到位调用处理器
void process(request) {
    handler = pipeline.findOne;
    while(handler != null){
        handler.doHandler(request);
        handler = handler.next();
    }
}
复制代码

netty就是使用的这种责任链模式,因此根据这种结构形式分析netty的链式调用,和上一种除了处理器的保存形式不同之外没差的.

自定义责任链模式

既然已经写出了链式存储的伪代码并要借此明确netty的调用规则,再次之前可以将这种方式用最简单的代码复刻出来便于后续理解,如下:

/**
 * @author daniel
 * @version 1.0.0
 * @date 2021/12/12
 */
public class PipelineDemo {

    public static void main(String[] args) {
        AbstractHandler handler1 = new Handler1();
        AbstractHandler handler2 = new Handler2();
        AbstractHandler handler3 = new Handler3();

        handler1.setNextHandler(handler2);
        handler2.setNextHandler(handler3);

        HandlerChainContext handlerChainContext = new HandlerChainContext(handler1);
        handlerChainContext.startRun("丁代光");
    }

}

/**
 * 处理器上下文,负责维护链表和链表中处理器的执行
 */
class HandlerChainContext {

    AbstractHandler currentHandler;

    /**
     * 初始化责任链头
     * @param currentHandler 责任链头
     */
    public HandlerChainContext(AbstractHandler currentHandler) {
        this.currentHandler = currentHandler;
    }

    /**
     * 责任链调用入口
     * @param args 请求信息
     */
    public void startRun(Object args) {
        currentHandler.doHandler(this, args);
    }

    /**
     * 执行下一个处理器
     * @param args 参数
     */
    public void runNext(Object args) {
        AbstractHandler nextHandler = currentHandler.getNextHandler();
        currentHandler = nextHandler;
        if (nextHandler == null) {
            System.out.println("结束");
            return;
        }
        nextHandler.doHandler(this, args);
    }
}

/**
 * 定义处理器抽象类
 */
abstract class AbstractHandler {

    AbstractHandler nextHandler;

    public AbstractHandler getNextHandler() {
        return nextHandler;
    }

    public void setNextHandler(AbstractHandler nextHandler) {
        this.nextHandler = nextHandler;
    }

    abstract void doHandler(HandlerChainContext handlerChainContext, Object args);
}

/**
 * 处理器实现类1
 */
class Handler1 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler1";
        System.out.println("已经经过Handler1处理器的处理,处理结果:" + args);
        //执行下一个处理器
        handlerChainContext.runNext(args);
    }
}

class Handler2 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler2";
        System.out.println("已经经过Handler2处理器的处理,处理结果:" + args);
        //执行下一个处理器
        handlerChainContext.runNext(args);
    }
}
class Handler3 extends AbstractHandler {

    @Override
    void doHandler(HandlerChainContext handlerChainContext, Object args) {
        args = args.toString() + " --- handler3";
        System.out.println("已经经过Handler3处理器的处理,处理结果:" + args);
        //执行下一个处理器
        handlerChainContext.runNext(args);
    }
}
复制代码

执行结果,如下:

已经经过Handler1处理器的处理,处理结果:丁代光 --- handler1
已经经过Handler2处理器的处理,处理结果:丁代光 --- handler1 --- handler2
已经经过Handler3处理器的处理,处理结果:丁代光 --- handler1 --- handler2 --- handler3
结束
复制代码

这是严格按照上面伪代码实现的一个简单的链表存储的责任链模式,当然netty中药比这复杂也更安全的多,虽然简单但是对netty的责任链模式的理解一定大有裨益.

Netty中的ChannelPipeline责任链

  1. 在Netty中Pipeline管道保存了通道所有处理器信息.

  2. 这个管道是在创建Channel时自动创建的一个专有的pipeline,也就是在bind端口时使用initAndRegister方法时生成的,如下:

    channel = channelFactory.newChannel();
    复制代码

    ++通过debug以及上文可以明确知道现在创建的是NioServerSocketChanel,那么就可以直接去查看其构造方法++,如下:

    /**
     * Create a new instance
     */
    public NioServerSocketChannel() {
        this(newSocket(DEFAULT_SELECTOR_PROVIDER));
    }
    复制代码
    /**
     * Create a new instance using the given {@link ServerSocketChannel}.
     */
    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }
    复制代码

    ++之后又调用其父类AbstractNioChannel的构造方法继续构造++,如下:

    /**
     * Create a new instance
     *
     * @param parent            the parent {@link Channel} by which this instance was created. May be {@code null}
     * @param ch                the underlying {@link SelectableChannel} on which it operates
     * @param readInterestOp    the ops to set to receive data from the {@link SelectableChannel}
     */
    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);  //设置为非阻塞模式
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                logger.warn(
                            "Failed to close a partially initialized socket.", e2);
            }
    
            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }
    复制代码

    ++从这个方法将nio设置为非阻塞模式,已经逐步走上正轨,开始调用jdk提供的nio框架了,继续调用其父类AbstractChannel的构造方法++,如下:

    /**
     * Creates a new instance.
     *
     * @param parent
     *        the parent of this channel. {@code null} if there's no parent.
     */
    protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline(); //创建Pipeline管道
    }
    复制代码

    ++到此为止可以说是已经创建通道完成,但是我们要知道的是Pipeline管道,因此可以继续查看newChannelPipeline方法++,如下:

    /**
     * Returns a new {@link DefaultChannelPipeline} instance.
     */
    protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this); //创建一个DefaultChannelPipeline对象,这就是管道对象,负责链路管理
    }
    复制代码
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);
    
        tail = new TailContext(this);  //创建链表尾部
        head = new HeadContext(this);  //创建链表头部
    
        //中间尚未有处理器,到此仅仅头尾相连
        head.next = tail;
        tail.prev = head;
    }
    复制代码

    ++现在看来是不是和简单的自定义demo特别像了,只是DefaultChannelPipeline管道中没有任何一个处理器,当然也不尽然,HeadContext和TailContext都是处理器,只不过是一个特殊的处理器.++

    还有什么处理器可以在这这个链路上我们慢慢来看......

  3. 入站事件(write)和出站操作(read)都会调用pipeline上的处理器.

之后我们需要处理的事情就是:

  1. 入站事件和出站操作是什么意思?
  2. Handler处理器有什么,有什么作用?
  3. Pipeline是如何维护Handler的?
  4. handler的执行流程是怎样的?

入站事件和出站事件

入站事件通常是指I/O线程生成了入站数据,比如EventLoop收到selector的OP_READ事件,入站处理器调用socketChannel.read(ByteBuffer)接收到数据后,这将导致通道的ChannelPipeline中包含的下一个中的channelRead方法被调用.总的来说就是socket更底层有数据来了自动适配相应的入站处理器去处理.

nioData -> nettyInHandler:netty层处理数据

出站事件经常是指I/O线程执行实际的输出操作,比如bind方法用意是请求server socket绑定到给定的SocketAddress,这将导致通道的ChannelPipeline中包含的下一个出站处理器中的bind方法被调用,总的来说就是手动调用合适的出站处理器去处理一个数据发送到更底层的socket逻辑.

nettyOutHandler -> nio:netty层发送数据到更底层进行处理

针对这两种种类的事件,netty提供了更多具体的事件,如下:

  • 入站事件(inbound)
    • fireChannelRegistered:channel注册事件
    • fireChannelUnregistered:channel接除注册事件
    • fireChannelActive:channel活跃事件
    • fireChannelInactive:channel非活跃事件
    • fireExceptionCaught:异常事件
    • fireUserEventTriggered:用户自定义事件
    • fireChannelRead:channel读事件
    • fireChannelReadComplete:channel读完成事件
    • fireChannelWritabilityChanged:channel写状态变化事件
  • 出站事件(outbound)
    • bind:端口绑定事件
    • connect:连接事件
    • disconnect:断开连接事件
    • close:关闭事件
    • derigister:接触注册事件
    • flush:刷新数据到网络事件
    • read:读事件,用于注册OP_READ到selector
    • write:写事件
    • writeAndFlush:写出数据事件

事件描述到此为止,后面通过查看源码去详细的了解事件机制,从开头到结尾除了事件之外这篇文章最常见的就是处理器,也知道了netty中处理器是如何存储的了(pipeline),那么除了头HeadContext和尾TailContext处理器之外还有其他什么处理器?

Pipeline中的handler是什么?

所有的handler都源自一个处理器顶级接口ChannelHandler,用于处理IO事件和拦截IO事件并转发到ChannelPipeline中的下一个处理器,如下:

package io.netty.channel;

import io.netty.util.Attribute;
import io.netty.util.AttributeKey;

import java.lang.annotation.Documented;
import java.lang.annotation.ElementType;
import java.lang.annotation.Inherited;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
public interface ChannelHandler {
    void handlerAdded(ChannelHandlerContext ctx) throws Exception;
    void handlerRemoved(ChannelHandlerContext ctx) throws Exception;
    @Deprecated
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
    @Inherited
    @Documented
    @Target(ElementType.TYPE)
    @Retention(RetentionPolicy.RUNTIME)
    @interface Sharable {
        // no value
    }
}
复制代码

可以看到这个顶级接口定义的功能很弱,实际使用时还会去实现两大子接口:

  • ChannelInboundHandler:处理入站IO事件
  • ChannelOutboundHandler:处理出站IO事件

这和上面的事件就对应上了,也就是说事件在处理器中使用.

上面已经有了入站接口和出站接口,为了开发方便,避免所有handler去实现一遍接口方法,Netty提供了一些简单的适配实现类:

  • ChannelInboundHandlerAdapter:处理入站IO事件
  • ChannelOutboundHandlerAdapter:处理入站IO事件
  • ChannelDuplexHandler:支持同时处理入站和出站事件

==和自定义责任链模式一样,Netty中实际存储在Pipeline中的对象并不是ChannelHandler,而是上下文对象ChannelHandlerContext,它将handler包裹在上下文对象中,通过上下文对象与它所属的ChannelPipeline交互,向上或向下传递事件或者修改pipeline都是通过这个上下文对象进行的.==

到此为止已经知道了处理器是什么以及在处理器中执行的入站事件和出站事件,那么Netty是如何维护这么一整条ChannelHandler链路的呢?

如何维护Pipeline中的handler

一个前提是ChannelPipeline是线程安全的,也就是说可以在任何时候或添加或删除或替换一个ChannelHandler,一般的操作是初始化的时候将handler增加进去,或许会存在删除处理器的情况,但是非常少,并且Netty提供了丰富的针对Pipeline管理handler的api(往前看在Defau...Pipeline中),如下:

  • addFirst:在最前面插入
  • addLast:在最后面插入
  • addBefore:插入到指定处理器前面
  • addAfter:插入到指定处理器后面
  • remove:移除指定处理器
  • removeFirst:移除第一个处理器
  • removeLast:移除最后一个处理器
  • replace:替换指定处理器

说了这么多从理论上已经解决了三个问题,先看源码吧...否则一切都是理论上的空谈...当然看了源码也没卵用这么多api直接用就好了,只不过是为了忽悠XX面试官...

处理器+事件源码

签名已经查看了顶级接口的代码,代码很少,方法很少,下面查看其两大子接口代码:

ChannelInboundHandler

 */
package io.netty.channel;

/**
 * {@link ChannelHandler} which adds callbacks for state changes. This allows the user
 * to hook in to state changes easily.
 */
public interface ChannelInboundHandler extends ChannelHandler {

    void channelRegistered(ChannelHandlerContext ctx) throws Exception;

    void channelUnregistered(ChannelHandlerContext ctx) throws Exception;

    void channelActive(ChannelHandlerContext ctx) throws Exception;

    void channelInactive(ChannelHandlerContext ctx) throws Exception;

    void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception;

    void channelReadComplete(ChannelHandlerContext ctx) throws Exception;

    void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception;

    void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception;

    @Override
    @SuppressWarnings("deprecation")
    void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception;
}
复制代码

ChannelOutboundHandler

package io.netty.channel;

import java.net.SocketAddress;

/**
 * {@link ChannelHandler} which will get notified for IO-outbound-operations.
 */
public interface ChannelOutboundHandler extends ChannelHandler {
    
    void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void connect(
            ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception;

    void disconnect(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception;

    void read(ChannelHandlerContext ctx) throws Exception;

    void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception;

    void flush(ChannelHandlerContext ctx) throws Exception;
}
复制代码

从顶级接口ChannelHandler中扩展出的两大子接口分别定义了入站事件和出站事件相应的方法,没有特殊需求这些事件足够涵盖我们开发过程中用到的所有的事件了.

除非我们必须自定义处理器,否则这三个接口对我们的作用都不大,但是我们要知道这三个接口的存在已经能做什么,大多数情况下我们应该使用的都是Netty提供的便于开发的适配器,如下:

ChannelInboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

public class ChannelInboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelInboundHandler {

    @Skip
    @Override
    public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelRegistered();
    }

    @Skip
    @Override
    public void channelUnregistered(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelUnregistered();
    }

    @Skip
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelActive();
    }

    @Skip
    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelInactive();
    }

    @Skip
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        ctx.fireChannelRead(msg);
    }

    @Skip
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelReadComplete();
    }

    @Skip
    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
        ctx.fireUserEventTriggered(evt);
    }

    @Skip
    @Override
    public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception {
        ctx.fireChannelWritabilityChanged();
    }

    @Skip
    @Override
    @SuppressWarnings("deprecation")
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
            throws Exception {
        ctx.fireExceptionCaught(cause);
    }
}
复制代码

ChannelOutboundHandlerAdapter

package io.netty.channel;

import io.netty.channel.ChannelHandlerMask.Skip;

import java.net.SocketAddress;

public class ChannelOutboundHandlerAdapter extends ChannelHandlerAdapter implements ChannelOutboundHandler {

    @Skip
    @Override
    public void bind(ChannelHandlerContext ctx, SocketAddress localAddress,
            ChannelPromise promise) throws Exception {
        ctx.bind(localAddress, promise);
    }

    @Skip
    @Override
    public void connect(ChannelHandlerContext ctx, SocketAddress remoteAddress,
            SocketAddress localAddress, ChannelPromise promise) throws Exception {
        ctx.connect(remoteAddress, localAddress, promise);
    }

    @Skip
    @Override
    public void disconnect(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.disconnect(promise);
    }

    @Skip
    @Override
    public void close(ChannelHandlerContext ctx, ChannelPromise promise)
            throws Exception {
        ctx.close(promise);
    }

    @Skip
    @Override
    public void deregister(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception {
        ctx.deregister(promise);
    }

    @Skip
    @Override
    public void read(ChannelHandlerContext ctx) throws Exception {
        ctx.read();
    }

    @Skip
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        ctx.write(msg, promise);
    }

    @Skip
    @Override
    public void flush(ChannelHandlerContext ctx) throws Exception {
        ctx.flush();
    }
}
复制代码

ChannelDuplexHandler

public class ChannelDuplexHandler extends ChannelInboundHandlerAdapter implements ChannelOutboundHandler {}
复制代码

可以看到这三个适配器已经帮助我们对接口进行了实现,我们只需要使用就可以了,即使我们需要对事件进行增强,我们也只需要继承自这三个适配器单独的增强单个方法,而没必要实现顶级接口去手动实现所有方法.

从三个适配器中可以看到一个贯穿全文的类ChannelHandlerContext,这个类充当是上下文的作用,维护一整条处理器链路,它仅仅是一个接口.我们可以分析其一个默认的实现类DefaultChannelHandlerContext,如下:

DefaultChannelHandlerContext

package io.netty.channel;

import io.netty.util.concurrent.EventExecutor;

/**
 * 默认的上下文实现类
 */
final class DefaultChannelHandlerContext extends AbstractChannelHandlerContext {

    private final ChannelHandler handler;  //上下文中存储的需要执行的处理器

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, handler.getClass());
        this.handler = handler; //在创建上下文对象的时候传进来的处理器
    }

    @Override
    public ChannelHandler handler() {
        return handler;
    }
}
复制代码

正如自定义责任链一样,在上下文中存储一个处理器,通过这个上下文去执行这个处理器,猛一看不对劲啊...不是实现类吗?接口呢?处理器链路的上下级关系呢?别着急看其父类AbstractChannelHandlerContext

AbstractChannelHandlerContext

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next; //下一个处理器
    volatile AbstractChannelHandlerContext prev; //上一个处理器
}
复制代码

这不就来了嘛,可以看到AbstractChannelHandlerContext实现了ChannelHandlerContext接口,并且其中有两个变量nextprev用于维护链路的上下级关系

==产生了一个疑问,不应该是一个上下文对象就可以维护一整条链路了吗?为甚了这儿的上下级关系用的还是上下文对象存储?==

++猜一下不对的话后面再纠正,我们在自定义的时候在处理器汇总使用了next存储下一个处理器,而在netty中的处理器,每一个处理器都不设计其他任何一个处理器,所以需要使用多个上下文对象来处理链路前后关系,这是猜的,不一定对...++

继续看怎么结合Pipeline使用的吧,阅读默认的管道实现类DefaultChannelPipeline,它实现了ChannelPipeline接口,那么必然有对处理器链路操作的api实现,如下:

/**
 * 默认管道实现类
 */
public class DefaultChannelPipeline implements ChannelPipeline {
    final AbstractChannelHandlerContext head; //头对象
    final AbstractChannelHandlerContext tail; //尾对象
    
    protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);  //创建链表尾部
        head = new HeadContext(this);  //创建链表头部

        //中间尚未有处理器,到此仅仅头尾相连
        head.next = tail;
        tail.prev = head;
    }
    
    @Override
    public final ChannelPipeline addFirst(String name, ChannelHandler handler) {
        return addFirst(null, name, handler);
    }
    
    @Override
    public final ChannelPipeline addFirst(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);
            name = filterName(name, handler);

            newCtx = newContext(group, name, handler); //创建新的上下文对象

            addFirst0(newCtx); //实际处理上下文过程

            // If the registered is false it means that the channel was not registered on an eventLoop yet.
            // In this case we add the context to the pipeline and add a task that will call
            // ChannelHandler.handlerAdded(...) once the channel is registered.
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }

            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                callHandlerAddedInEventLoop(newCtx, executor);
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
    
    /**
     * 实际处理上下文增加到链路第一个的过程
     * @param newCtx 新建的处理器上下文对象
     */
    private void addFirst0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext nextCtx = head.next;  //当前第一个
        newCtx.prev = head; //前一个为头
        newCtx.next = nextCtx; //后一个为当前第一个
        head.next = newCtx; //头的下一个是新建的上下文对象
        nextCtx.prev = newCtx; //当前第一个的前一个是新建的上下文对象
    }
}
复制代码

可以看到pipeline在初始化的时候就已经初始化好了头部和尾部,但却不是处理器而是上下文对象,在api中也是将处理器封装成上下文对象进行处理,那么我们的猜测是正确的,也就是
一个上下文pipeline对象中存储着一个两个特殊的context对象,有了这两个context对象就能将一串的context对象串起来了(context对象中prev和next),而实际上串起来的却是处理器,这样做的好处就是任何一个处理器对象都可以在任何地方使用,因为这个处理器对象中不设计其他处理器的任何信息,非常的纯粹,在其他地方使用时再封装一个上下文对象就好了,妙啊......

把前面的三个问题整合一下画个图,如下:

ChannelPipeline.png

这样前三个问题无论从理论还是源码方面都解决了,接下来分析handler的执行流程是怎样的

handler的执行分析

如上图,即使管道中有多重处理器(既有入站也有出站),在执行具体事件的时候Pipeline会自动选择在入站事件中不会使用出站处理器(125),在出站事件中不会使用入站处理器(521).

在封装处理器的上下文对象中,以fire开头的方法代表入站事件的传播和处理,其余方法代表出站事件的传播和处理.

分析registered入站事件的处理

registered事件在nio中代表通道(channel)和选择器(selector)的绑定,在netty中同理代表将通道(Channel)和选择器(EventLoop)绑定,比如bind方法

首先需要说明的是,管道是在创建Channel时自动创建的一个专有的pipeline,在initAndRegister方法中使用,在通道创建完成之后通过ServerBootStrap中的init方法对通道进行初始化,如下:

ServerBootstrap

@Override
void init(Channel channel) {
    setChannelOptions(channel, newOptionsArray(), logger);
    setAttributes(channel, newAttributesArray());

    ChannelPipeline p = channel.pipeline(); //获取通道中pipeline对象

    final EventLoopGroup currentChildGroup = childGroup;
    final ChannelHandler currentChildHandler = childHandler;
    final Entry<ChannelOption<?>, Object>[] currentChildOptions = newOptionsArray(childOptions);
    final Entry<AttributeKey<?>, Object>[] currentChildAttrs = newAttributesArray(childAttrs);

    //管道最后增加一个一个处理器ChannelInitializer,通过ChannelInitializer的注释可以知道这是一个特殊的处理器,它用于初始化通道,只执行一次然后销毁
    p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) { //触发ChannelInitializer时,收到注册成功的事件后,就会执行initChannel方法
            final ChannelPipeline pipeline = ch.pipeline();
            ChannelHandler handler = config.handler();
            if (handler != null) {
                pipeline.addLast(handler);
            }

            ch.eventLoop().execute(new Runnable() {
                @Override
                public void run() {
                    pipeline.addLast(new ServerBootstrapAcceptor(
                            ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                }
            });
        }
    });
}
复制代码

==pipeline在增加ChannelInitializer处理器之前时责任链中只包含两个处理器,HeadContext和TailContext,也就是HeadContext->TailContext,之后通过p.addLast()增加了一个处理器ChannelInitializer==,如下:

public abstract class ChannelInitializer<C extends Channel> extends ChannelInboundHandlerAdapter {}
复制代码

首先可以看到ChannelInitializer是一个入站事件处理器,从这个处理器中我们重点关注initChannel,channelRegistered和handlerAdded,如下:

@Override
@SuppressWarnings("unchecked")
public final void channelRegistered(ChannelHandlerContext ctx) throws Exception {
    // Normally this method will never be called as handlerAdded(...) should call initChannel(...) and remove
    // the handler.
    //如果handlerAdded(...)方法被执行了,channelRegistered这个方法理论上不会再被调用
    if (initChannel(ctx)) { //因为initChannel方法被调用之后,这个handler就会被移除
        // we called initChannel(...) so we need to call now pipeline.fireChannelRegistered() to ensure we not
        // miss an event.
        ctx.pipeline().fireChannelRegistered();

        // We are done with init the Channel, removing all the state for the Channel now.
        removeState(ctx);
    } else {
        // Called initChannel(...) before which is the expected behavior, so just forward the event.
        ctx.fireChannelRegistered();
    }
}

/**
 * {@inheritDoc} If override this method ensure you call super!
 */
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
    if (ctx.channel().isRegistered()) { //如果当前通道已经被注册则调用初始化方法
        // This should always be true with our current DefaultChannelPipeline implementation.
        // The good thing about calling initChannel(...) in handlerAdded(...) is that there will be no ordering
        // surprises if a ChannelInitializer will add another ChannelInitializer. This is as all handlers
        // will be added in the expected order.
        if (initChannel(ctx)) {

            // We are done with init the Channel, removing the initializer now.
            removeState(ctx); //移除处理器
        }
    }
}

/**
 * 通道初始化
 */
@SuppressWarnings("unchecked")
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
    if (initMap.add(ctx)) { // Guard against re-entrance.
        try {
            initChannel((C) ctx.channel()); //这个init方法一般就是创建channel时,实现的那个initChannel方法
        } catch (Throwable cause) {
            // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
            // We do so to prevent multiple calls to initChannel(...).
            exceptionCaught(ctx, cause);
        } finally {
            ChannelPipeline pipeline = ctx.pipeline();
            if (pipeline.context(this) != null) {
                pipeline.remove(this); //ChannelInitializer执行结束之后,会把自己从pipeline中删除掉,避免重复初始化
            }
        }
        return true;
    }
    return false;
}
复制代码

channelRegistered方法中有一段值得注意的注释,如果handlerAdd(...)被执行了,channelRegistered方法理论上不会再被调用,这是为什么呢?

查看handlerAdd方法会发现如果通道已经被初始化完成会从责任链中移除这个处理器,那上面的channelRegistered方法确实不会再被调用了.

无论是channelRegistered还是handlerAdd都会执行initChannel(ChannelHandlerContext),在这个方法中调用了initChannel((C) ctx.channel());,这个方法就是我们在执行p.addLast()时重写的代码,并且在initChannel(ChannelHandlerContext)中也能找到移除当前处理器的代码pipeline.remove(this)

++从ChannelInitializer的分析中我们也能够体会到责任链中处理器动态添加和动态删除的妙用.++

==需要注意的是在这儿仅仅是增加了一个处理器,虽然还没有执行,但是执行完p.addLast()方法之后我们责任链中的处理器已经发生了变化,也就是HeadContext->ChannelInitializer(匿名)->TailContext.==

++到此我们的initAndRegister()发放中的init()方法也就执行完成了.++

之后我们查看register方法,在前面我们已经知道register方法最终实际调用的就是AbstractChannel.register,如下:

@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
    //eventLoop ---> 选择器
    //promise --->通道
    ObjectUtil.checkNotNull(eventLoop, "eventLoop");
    if (isRegistered()) {
        promise.setFailure(new IllegalStateException("registered to an event loop already"));
        return;
    }
    if (!isCompatible(eventLoop)) {
        promise.setFailure(
                new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
        return;
    }

    AbstractChannel.this.eventLoop = eventLoop;

    //判断调用绑定的方法的线程是不是选择器线程,如果不是以线程的方式提交
    if (eventLoop.inEventLoop()) {
        //实际绑定过程
        register0(promise);
    } else {
        try {
            //一旦触发了任务提交,eventLoop就会开始正式执行工作,轮询
            eventLoop.execute(new Runnable() {
                @Override
                public void run() {
                    register0(promise);
                }
            });
        } catch (Throwable t) {
            logger.warn(
                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                    AbstractChannel.this, t);
            closeForcibly();
            closeFuture.setClosed();
            safeSetFailure(promise, t);
        }
    }
}
复制代码

其中AbstractChannel.this.eventLoop = eventLoop;拿到EventLoop对象,eventLoop.inEventLoop()判断当前EventLoop是否正在执行,这还没绑定完成呢当然没有执行,所有肯定会以任务的形式提交注册方法到EventLoop,如下:

eventLoop.execute(new Runnable() {
    @Override
    public void run() {
        register0(promise);
    }
});
复制代码

事先说明一下EventLoop中是没有线程的,只有当任务执行的时候才会通过线程创建器创建线程,这个就不细说了,线程模型中已经说的很清楚,我们重点关注register0(promise),如下:

private void register0(ChannelPromise promise) {
    try {
        // check if the channel is still open as it could be closed in the mean time when the register
        // call was outside of the eventLoop
        if (!promise.setUncancellable() || !ensureOpen(promise)) {
            return;
        }
        boolean firstRegistration = neverRegistered;
        doRegister();  //NioChannel中,将Channel与NioEventLoop进行绑定 --- 实际就是nio此等代码调用

        //到此为止注册完成


        neverRegistered = false;
        registered = true;

        // Ensure we call handlerAdded(...) before we actually notify the promise. This is needed as the
        // user may already fire events through the pipeline in the ChannelFutureListener.
        pipeline.invokeHandlerAddedIfNeeded(); //触发HandlerAdd方法 --- 他来了

        safeSetSuccess(promise);
        pipeline.fireChannelRegistered();  //传播通道注册完成事件
        // Only fire a channelActive if the channel has never been registered. This prevents firing
        // multiple channel actives if the channel is deregistered and re-registered.
        if (isActive()) {
            if (firstRegistration) {
                pipeline.fireChannelActive();
            } else if (config().isAutoRead()) {
                // This channel was registered before and autoRead() is set. This means we need to begin read
                // again so that we process inbound data.
                //
                // See https://github.com/netty/netty/issues/4805
                beginRead();
            }
        }
    } catch (Throwable t) {
        // Close the channel directly to avoid FD leak.
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
    }
}
复制代码

注册完成之前的代码不必细说,现在我们研究pipeline.invokeHandlerAddedIfNeeded();,从这儿开始通道就开始正式执行处理器中的事件了,这是一个HandlerAdd事件,当前我们又3个处理器,只有第二个处理器有相应的执行方法,所以一定是执行的第二个处理器中的方法,这就绕回去了吧,这段代码的底层我们无需关心,debug进去点点点就可以找到.

我们重点还是分析第二个处理器的执行吧,使用handlerAdded方法执行initChannel(ctx),执行完成之后执行removeState(ctx);,因此执行完pipeline.invokeHandlerAddedIfNeeded();之后pipeline中又只剩下HeadContext和TailContext两个处理器了,但是事实真的是这样的吗?

handlerAdded方法执行initChannel(ctx),会执行我们添加处理器时实现的抽象方法,我们好像一直忽略了我们增加的处理器的抽象方法具体是干什么的了?细看如下:

@Override
public void initChannel(final Channel ch) { //触发ChannelInitializer时,收到注册成功的事件后,就会执行initChannel方法
    final ChannelPipeline pipeline = ch.pipeline(); //pipeline对象
    ChannelHandler handler = config.handler(); //从配置中读取处理器
    if (handler != null) {
        pipeline.addLast(handler); //配置中的处理器不为空,增加处理器
    }

    ch.eventLoop().execute(new Runnable() {  //最后又增加一个任务增加ServerBootstrapAcceptor处理器
        @Override
        public void run() {
            pipeline.addLast(new ServerBootstrapAcceptor(
                    ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        }
    });
}
复制代码

config.handler()是什么?...其实就是我们在执行bing方法之前增加的配置,如下:

b.group(bossGroup, workerGroup)
 .channel(NioServerSocketChannel.class)
 .option(ChannelOption.SO_BACKLOG, 100)
 .handler(new LoggingHandler(LogLevel.INFO))
 .childHandler(new ChannelInitializer<SocketChannel>() {
     @Override
     public void initChannel(SocketChannel ch) throws Exception {
         ChannelPipeline p = ch.pipeline();
         p.addLast(serverHandler);
     }
 });
复制代码

从该配置中查询到一个处理器LoggingHandler,那么执行完initChannel方法之后,责任链中一定出现此处理器,并且LoggingHandler继承自ChannelDuplexHandler,是一个聚合处理器,无论是出站事件还是入站事件都会执行此处理器中的方法,没什么质疑的,这就是一个记录日志的.

并且最后又增加了一个处理器ServerBootstrapAcceptor.

==也就是说执行完pipeline.invokeHandlerAddedIfNeeded();之后,责任链中当前有4个处理器,也就是HeadContext->LoggingHandler->ServerBootstrapAcceptor->TailContext,当然在删除原来处理器之前应该是5个的.==

==但是,当方法执行完成后在责任链中没有找到ServerBootstrapAcceptor处理器...这...尴尬,继续看吧没找到==

之后执行的方法是pipeline.fireChannelRegistered();用于传播通道注册完成事件,这个事件就很简单了,遍历所有处理器去执行register事件,直到处理器不再调用下一个处理器,也就是TailContext.

==debug一下才突然想起来注册事件的通知是异步的,在提交任务之后就是另一个线程在处理这件事情了,确实是新增了两个处理器.==

到此为止Register入站事件就完成了,之后就是bind出站事件了.

分析bind出站事件的处理器

真正执行端口绑定的代码,如下:

private static void doBind0(
        final ChannelFuture regFuture, final Channel channel,
        final SocketAddress localAddress, final ChannelPromise promise) {

    // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
    // the pipeline in its channelRegistered() implementation.
    channel.eventLoop().execute(new Runnable() {
        @Override
        public void run() {  //想NioEventLoop提交任务进行端口绑定
            //注册完成并且成功
            if (regFuture.isSuccess()) {
                //绑定
                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
            } else {
                promise.setFailure(regFuture.cause());
            }
        }
    });
}
复制代码

这个方法会将注册任务以异步的形式发布到EventLoop上面去执行,重点关注channel.bind(...)调用,实际调用AbtractChannel.bind,如下:

@Override
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    //触发一个netty职责链中的绑定事件,由应用层代码发起到底层,属于outBound
    return pipeline.bind(localAddress, promise);
}
复制代码

如果debug的话就会发现到此一定是由4个处理器在pipeline中,继续查看pipeline.bind(localAddress, promise),如下:

@Override
public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
    return tail.bind(localAddress, promise);
}
复制代码

可以看到是否通过tail开始调用bind出站事件的,这是出站事件的特性,从尾部开始调用,如果没有重写bind事件方法的话一定会调用AbstractChannelHandlerContext.bind方法,如下:

@Override
public ChannelFuture bind(final SocketAddress localAddress, final ChannelPromise promise) {
    ObjectUtil.checkNotNull(localAddress, "localAddress");
    if (isNotValidPromise(promise, false)) {
        // cancelled
        return promise;
    }

    final AbstractChannelHandlerContext next = findContextOutbound(MASK_BIND); //查找下一个出站事件处理器
    EventExecutor executor = next.executor();
    if (executor.inEventLoop()) {
        next.invokeBind(localAddress, promise); //事件执行
    } else {
        safeExecute(executor, new Runnable() {
            @Override
            public void run() {
                next.invokeBind(localAddress, promise);
            }
        }, promise, null, false);
    }
    return promise;
}
复制代码

查找下一个出站处理器,其实应该是查找上一个出站处理器,这儿是反着的,如果找到的话就会调用next.invokeBind(...)方法去执行,如下:

private void invokeBind(SocketAddress localAddress, ChannelPromise promise) {
    if (invokeHandler()) {
        try {
            ((ChannelOutboundHandler) handler()).bind(this, localAddress, promise); //实际调用处理器中的bind方法
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    } else {
        bind(localAddress, promise);
    }
}
复制代码

在上述责任链中,调用的一定是LoggingHandler中的bind方法,是否继续往下查找由处理器中的代码决定,直到没有bind出站事件传播为止,在这儿也就是HeadContext中的bind方法,如下:

@Override
public void bind(
        ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) {
    unsafe.bind(localAddress, promise);
}
复制代码

这儿的unsafe是什么,我们可以查看其构造方法,如下:

HeadContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, HEAD_NAME, HeadContext.class);
    unsafe = pipeline.channel().unsafe();
    setAddComplete();
}
复制代码

可以看到unsafe其实就是channel中的unsafe对象,实际调用的是AbstractChannel.bind方法,如下:

@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
    assertEventLoop();

    if (!promise.setUncancellable() || !ensureOpen(promise)) {
        return;
    }

    // See: https://github.com/netty/netty/issues/576
    if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
        localAddress instanceof InetSocketAddress &&
        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
        !PlatformDependent.isWindows() && !PlatformDependent.maybeSuperUser()) {
        // Warn a user about the fact that a non-root user can't receive a
        // broadcast packet on *nix if the socket is bound on non-wildcard address.
        logger.warn(
                "A non-root user can't receive a broadcast packet if the socket " +
                "is not bound to a wildcard address; binding to a non-wildcard " +
                "address (" + localAddress + ") anyway as requested.");
    }

    boolean wasActive = isActive();
    try {
        doBind(localAddress);
    } catch (Throwable t) {
        safeSetFailure(promise, t);
        closeIfClosed();
        return;
    }

    if (!wasActive && isActive()) {
        invokeLater(new Runnable() {
            @Override
            public void run() {
                pipeline.fireChannelActive();
            }
        });
    }

    safeSetSuccess(promise);
}
复制代码

我们只需要关注其中的doBind(localAddress);执行调用,我们知道此时的通道是NioServerSocketChannel,那么可以直接去找其对应的doBind方法,如下:

@SuppressJava6Requirement(reason = "Usage guarded by java version check")
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
    if (PlatformDependent.javaVersion() >= 7) {
        javaChannel().bind(localAddress, config.getBacklog());
    } else {
        javaChannel().socket().bind(localAddress, config.getBacklog());
    }
}
复制代码

总算绕出来了,最终调用了jdk-nio的底层逻辑进行端口绑定.

到此为止,服务端可以使用了.

在执行注册入站事件的时候在链路中新增了一个处理器ServerBootstrapAcceptor,一直没有用到,它实际上是连接入站处理器,在执行accept入站事件的时候一定会用到,不妨分析一下.

分析accept入站事件的处理

到现在为止,服务端已经启动完成,EventLoop中已经有事件开始轮询,当EventLoop轮询到accept事件之后就会开始accept入站事件的传播,这是一个前提应该得明白,其实和nio轮询selector是一模一样的,netty做的仅仅是封装.

我们可以直接查看NioEventLoop的run方法,run方法的作用只有两个,不断的轮询查询事件和task任务,我们重点关注处理事件的方法,如下:

private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        //携带选择器中的事件去轮询,这个选择器是初始化NioEventLoop的时候生成的,害为了这个看了两遍前面的
        //但是这个选择器和通道绑定后才能生效使用(执行这些事件的时候肯定已经绑定了)
        //从绑定了通道的selector中读取事件
        processSelectedKeysPlain(selector.selectedKeys());
    }
}
复制代码
/**
 * 处理事件
 * @param selectedKeys 事件集合
 */
private void processSelectedKeysPlain(Set<SelectionKey> selectedKeys) {
    // check if the set is empty and if so just return to not create garbage by
    // creating a new Iterator every time even if there is nothing to process.
    // See https://github.com/netty/netty/issues/597
    if (selectedKeys.isEmpty()) { //没有事件
        return;
    }

    //遍历查询结果
    Iterator<SelectionKey> i = selectedKeys.iterator();
    //轮询
    for (;;) {
        //被封装的事件
        final SelectionKey k = i.next();
        //获取事件对应通道
        final Object a = k.attachment();
        i.remove(); //从校核中移除事件

        if (a instanceof AbstractNioChannel) { //判断是否是netty通道
            //处理单个事件
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            @SuppressWarnings("unchecked")
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }

        if (!i.hasNext()) {
            break;
        }

        if (needsToSelectAgain) {
            selectAgain();
            selectedKeys = selector.selectedKeys();

            // Create the iterator again to avoid ConcurrentModificationException
            if (selectedKeys.isEmpty()) {
                break;
            } else {
                i = selectedKeys.iterator();
            }
        }
    }
}
复制代码
/**
 * 处理单个事件
 * @param k
 * @param ch
 */
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    //k ---> 被封装的事件
    //ch ---> 获取事件对应netty通道
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();

    if (!k.isValid()) { //判断此事件是否有效(无效进入此逻辑关闭unsafe)
        final EventLoop eventLoop;
        try {
            eventLoop = ch.eventLoop();  //获取nio事件执行器
        } catch (Throwable ignored) {
            // If the channel implementation throws an exception because there is no event loop, we ignore this
            // because we are only trying to determine if ch is registered to this event loop and thus has authority
            // to close ch.
            return;
        }
        // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
        // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
        // still healthy and should not be closed.
        // See https://github.com/netty/netty/issues/5125
        if (eventLoop == this) {
            // close the channel if the key is not valid anymore
            unsafe.close(unsafe.voidPromise());
        }
        return;
    }

    try {
        int readyOps = k.readyOps(); //获取事件类型
        // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
        // the NIO JDK channel implementation may throw a NotYetConnectedException.
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) { //connect事件主要用于客户端
            // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
            // See https://github.com/netty/netty/issues/924
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);

            unsafe.finishConnect();
        }

        // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
            //write事件在这儿处理
            ch.unsafe().forceFlush();
        }

        // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
        // to a spin loop
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            //accept和read都在这儿进行处理(服务端重点关注)
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}
复制代码

这三个方法在说明netty的线程模型的时候应该已经不陌生了,我们重点关注unsafe.read()方法,这儿才是真正的处理连接的方法,同理unsafe是通道中的unsafe,因此我们直接去查看其NioMessageUnsafe.read()方法,如下:

@Override
public void read() {
    assert eventLoop().inEventLoop();
    final ChannelConfig config = config();
    final ChannelPipeline pipeline = pipeline();
    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
    allocHandle.reset(config);

    boolean closed = false;
    Throwable exception = null;
    try {
        try {
            do {
                int localRead = doReadMessages(readBuf);
                if (localRead == 0) {
                    break;
                }
                if (localRead < 0) {
                    closed = true;
                    break;
                }

                allocHandle.incMessagesRead(localRead);
            } while (continueReading(allocHandle));
        } catch (Throwable t) {
            exception = t;
        }

        int size = readBuf.size();
        for (int i = 0; i < size; i ++) {
            readPending = false;
            pipeline.fireChannelRead(readBuf.get(i));
        }
        readBuf.clear();
        allocHandle.readComplete();
        pipeline.fireChannelReadComplete();

        if (exception != null) {
            closed = closeOnReadError(exception);

            pipeline.fireExceptionCaught(exception);
        }

        if (closed) {
            inputShutdown = true;
            if (isOpen()) {
                close(voidPromise());
            }
        }
    } finally {
        // Check if there is a readPending which was not processed yet.
        // This could be for two reasons:
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
        // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
        //
        // See https://github.com/netty/netty/issues/2254
        if (!readPending && !config.isAutoRead()) {
            removeReadOp();
        }
    }
}
复制代码

首先我们从方法中看到一个调用是int localRead = doReadMessages(readBuf);,read方法我们需要重点关注,这个方法AbstractNioMessageChannle中只给出了抽象方法,那我我们可以直接去找其子类NioServerSocketChannel,如下:

@Override
protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(javaChannel()); //从连接中获取jdk中的SocketChannel对象

    try {
        if (ch != null) {
            buf.add(new NioSocketChannel(this, ch)); //将jdk中的SocketChannel对象封装成netty使用的NioSocketChannel
            return 1;
        }
    } catch (Throwable t) {
        logger.warn("Failed to create a new channel from an accepted socket.", t);

        try {
            ch.close();
        } catch (Throwable t2) {
            logger.warn("Failed to close a socket.", t2);
        }
    }

    return 0;
}
复制代码

可以看到这个方法只做了一件事也就是将jdk中的nio过渡到netty,封装成NioSocketChannel,也是第一次涉及客户端通道,之前都是围绕NioServerSocketChannel展开的.

已经获取到新的连接,那就可以遍历连接传播read入站事件了,如下:

for (int i = 0; i < size; i ++) { //遍历读取到的新连接
    readPending = false;
    pipeline.fireChannelRead(readBuf.get(i)); //开始责任链调用,散播read入站事件
}
复制代码

之后就是4个处理器的调用,应该很容易理解一定会调用ServerBootstrapAcceptor中的read方法,我们可以看一下,如下:

@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
    final Channel child = (Channel) msg; //msg就是accept获取到的新链接SocketChannel

    child.pipeline().addLast(childHandler); //新增处理器,这个处理器是我们服务端启动时配置的处理器ChannelInitializer

    setChannelOptions(child, childOptions, logger); //通道设置
    setAttributes(child, childAttrs);

    try { //通道注册,从subEventLoopGroup中选择一个nioEventLoop来处理IO操作,将通道与EventLoop绑定,触发register事件和active事件,然后再自动注册OP_READ(和main没区别)
        childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    forceClose(child, future.cause());
                }
            }
        });
    } catch (Throwable t) {
        forceClose(child, t);
    }
}
复制代码

写了一些注释应该比较清楚了,这个方法的作用就是分发,main负责接收新连接(accept)然后分发给sub去执行之后的逻辑(read).

到此为止服务端pipeline中的处理器应该明白了,并且服务端如何接收新连接,已经接收到新连接之后是如何分发的也应该清楚了,那么是否到现在来了一个read事件应该由childGroup中的EventLoop去轮询处理了呢?让我们看一下

分析read入站事件的处理

有了上面三个事件的分析,我们应该应该总结一下,整个链路分析的准则是什么?

  1. 这是一个什么事件?
  2. 在执行或分发这个事件的时候这个链路(pipeline)中当前有什么处理器?
  3. 哪些会被触发?
  4. 执行顺序是什么?
  5. 事件执行完毕后还剩下什么处理器,是否有新增或删除?

我们去看一下read入站事件,和accept事件一样,我们重点关注的应该是unsafe.read(),既然是read事件,那么肯定是sub中的Eventloop去吃力,其初始化时使用的unsafe就是AbstractNioByteChannel,也就是调用其read方法,如下:

@Override
    public final void read() { //客户端发送的请求具体数据的读取
        final ChannelConfig config = config();
        if (shouldBreakReadReady(config)) {
            clearReadPending();
            return;
        }
        final ChannelPipeline pipeline = pipeline();
        final ByteBufAllocator allocator = config.getAllocator();
        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();
        allocHandle.reset(config);

        ByteBuf byteBuf = null;
        boolean close = false;
        try {
            do {
                byteBuf = allocHandle.allocate(allocator); //申请一个字节缓冲区
                allocHandle.lastBytesRead(doReadBytes(byteBuf)); //将具体数据读取到字节缓冲区
                if (allocHandle.lastBytesRead() <= 0) {
                    // nothing was read. release the buffer.
                    byteBuf.release();
                    byteBuf = null;
                    close = allocHandle.lastBytesRead() < 0;
                    if (close) {
                        // There is nothing left to read as we received an EOF.
                        readPending = false;
                    }
                    break;
                }

                allocHandle.incMessagesRead(1);
                readPending = false;
                pipeline.fireChannelRead(byteBuf); //将读出的内容分发到责任链交由处理器去处理
                byteBuf = null;
            } while (allocHandle.continueReading());

            allocHandle.readComplete();
            pipeline.fireChannelReadComplete();

            if (close) {
                closeOnRead(pipeline);
            }
        } catch (Throwable t) {
            handleReadException(pipeline, byteBuf, t, close, allocHandle);
        } finally {
            // Check if there is a readPending which was not processed yet.
            // This could be for two reasons:
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
            //
            // See https://github.com/netty/netty/issues/2254
            if (!readPending && !config.isAutoRead()) {
                removeReadOp();
            }
        }
    }
}
复制代码

和另一个read方法一样,另一个是为了读取连接而这是是为了读取内容,之后交由责任链去处理读取到的具体数据.

现在我们知道是read入站事件,并且pipeline中有三个处理器(这3个处理器如果具体想知道什么时候加入的可以去阅读accept事件最后的注册逻辑.),分别是HeadContext,EchoServerHandler(自定义的)以及TailContext,这三个处理器都会被触发,是由HeadContext开始执行的,那么我们只需要关注自定义的EchoServerHandler就行了,如下:

package io.netty.example.echo;

import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * Handler implementation for the echo server.
 */
@Sharable
public class EchoServerHandler extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ctx.write(msg);
    }

    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        ctx.flush();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        // Close the connection when an exception is raised.
        cause.printStackTrace();
        ctx.close();
    }
}
复制代码

就一个时,触发read事件的时候写回去,也就是再触发write出站事件(不再细说,可以理解为打印就行了).

自定义处理器

明白了调用流程,当我们在实际应用的时候就可以自定义三个处理器来执行业务逻辑:

  • 协议解码器:将字节数组转换为java对象进行处理
  • 业务逻辑处理器:处理java对象
  • 协议编码器:将java对象编码成字节数组返回

小站地址:http://175.24.172.160:3000/#/netty/responsibilityChainDesignMode