Netty是一个高性能基于异步事件驱动的网络应用程序框架,源码跟踪过程中断点设置的位置可参照我的GitHub项目debug-netty
一、Netty对三种I/O模式的支持
常用的三种I/O模式为:BIO、NIO、AIO,它们分别代表阻塞IO、非阻塞IO、异步IO
-
一次I/O的过程是怎样的?
- 首先用户程序发起系统调用,发生线程上下文切换,这个时候内核缓冲区内数据还没有就绪,需要等待数据从磁盘或者网卡拷贝到内核缓冲区,内核缓冲区数据就绪之后再拷贝至用户缓冲区,系统调用返回。
-
阻塞与非阻塞?
- 阻塞与非阻塞的区别在于数据就绪之前是否需要阻塞等待。
-
同步与异步?
- 同步与异步的区别在于数据就绪之后是否是应用程序自己去读取数据。
Netty支持BIO、NIO、AIO,对这三种IO模式的切换非常方便,只需直接切换EventLoopGroup和ServerSocketChannel即可。但在目前的主版本4.X中,BIO由于在连接数高的情况下耗资源高性能低已被废弃,AIO由于Linux下实现不够成熟、对比NIO的性能提升不明显也已经废弃。
二、Netty对三种Reactor模式的支持
Reactor是一种开发模式:注册感兴趣的事件->扫描是否有感兴趣的事件发生->事件发生之后做出相应的处理。
- 单线程Reactor:接收连接、读写、编解码、业务处理由同一个线程处理
- 多线程Reactor:编解码、业务处理使用线程池处理
- 主从Reactor:在多线程Reactor基础上单独用一个Reactor处理接收连接
// 单线程Reactor
EventLoopGroup eventGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup)
// 多线程Reactor
EventLoopGroup eventGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(eventGroup)
// 主从Reactor
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup, workerGroup)
复制代码
三、Server启动过程分析
3.1 Server启动代码
这里我使用的是主从Reactor模式
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
final EchoServerHandler serverHandler = new EchoServerHandler();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast(new LoggingHandler(LogLevel.INFO));
p.addLast(serverHandler);
}
});
// Start the server.
ChannelFuture f = b.bind(PORT).sync();
// Wait until the server socket is closed.
f.channel().closeFuture().sync();
} finally {
// Shut down all event loops to terminate all threads.
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
复制代码
3.2 Server启动源码分析
建议打开源码debug查看
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
创建了一个NioEventLoopGroup,它的构造方法this(nThreads, executor, SelectorProvider.provider());
通过SelectorProvider.provider()方法中的provider = sun.nio.ch.DefaultSelectorProvider.create();
实现了根据不同平台返回不同SelectorProvider实现。如果没有手动指定线程数(即NioEventLoop数量),它的父类MultithreadEventLoopGroup的构造方法中确定会设置默认线程数DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt("io.netty.eventLoopThreads", NettyRuntime.availableProcessors() * 2));
。最后会在父类MultithreadEventExecutorGroup的构造方法中通过newChild()方法创建N个NioEventLoop。再创建一个选择器用来给channel分配NioEventLoopchooser = chooserFactory.newChooser(children);
- NioEventLoop的构造方法中会校验SelectorProvider,然后通过openSelector()方法创建一个Selector。
b.group(bossGroup, workerGroup)
校验bossGroup和workerGroup并设置为ServerBootStrap中的group和childGroup。.channel(NioServerSocketChannel.class)
方法创建了一个ReflectiveChannelFactory并且将设置构造器为NioServerSocketChannel的构造器,并将其设置为ServerBootStrap的channelFactory。Netty中切换3种IO模式的改动之一就是通过切换.channel()中传入的ServerSocketChannel。
public B channel(Class<? extends C> channelClass) {
// debug-netty-start 创建一个构造器为channelClass类构造器的ChannelFactory并设置为当前的channelFactory
return channelFactory(new ReflectiveChannelFactory<C>(
ObjectUtil.checkNotNull(channelClass, "channelClass")
));
}
public ReflectiveChannelFactory(Class<? extends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz");
try {
this.constructor = clazz.getConstructor();
} catch (NoSuchMethodException e) {
……
}
}
public B channelFactory(ChannelFactory<? extends C> channelFactory) {
ObjectUtil.checkNotNull(channelFactory, "channelFactory");
if (this.channelFactory != null) {
throw new IllegalStateException("channelFactory set already");
}
this.channelFactory = channelFactory;
return self();
}
复制代码
ChannelFuture f = b.bind(PORT).sync();
bind()方法进行端口绑定,sync()阻塞到绑定完成。initAndRegister()初始化注册channel是异步操作,所以判断Future是否完成,如果完成则调用doBind0()进行端口绑定,否则注册一个监听器监听完成开始端口绑定。
private ChannelFuture doBind(final SocketAddress localAddress) {
// debug-netty-start 创建初始化ServerSocketChannel并注册到Selector
final ChannelFuture regFuture = initAndRegister();
final Channel channel = regFuture.channel();
if (regFuture.cause() != null) {
return regFuture;
}
// debug-netty-start 上述操作完成则进行端口绑定
if (regFuture.isDone()) {
ChannelPromise promise = channel.newPromise();
doBind0(regFuture, channel, localAddress, promise);
return promise;
} else {
// debug-netty-start 注册一个监听器监听上述事件的完成
final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel);
regFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Throwable cause = future.cause();
if (cause != null) {
promise.setFailure(cause);
} else {
// Registration was successful, so set the correct executor to use.
promise.registered();
doBind0(regFuture, channel, localAddress, promise);
}
}
});
return promise;
}
}
复制代码
initAndRegister()方法中通过channel = channelFactory.newChannel();
创建NioServerSocketChannel。并且向这个channel的pipeline处理链中加入了一个ServerBootstrapAcceptor处理接收连接。然后通过EventExecutorChooser的next()方法选择一个NioEventLoop进行register操作,Netty中根据NioEventLoop的数量来选择EventExecutorChooser。
// 对应步骤1中的chooser = chooserFactory.newChooser(children);
public EventExecutorChooser newChooser(EventExecutor[] executors) {
// debug-netty-start 根据待绑定的executor长度(即NioEventLoop的数量)是否是2的幂次方来选择
if (isPowerOfTwo(executors.length)) {
return new PowerOfTwoEventExecutorChooser(executors);
} else {
return new GenericEventExecutorChooser(executors);
}
}
private static final class PowerOfTwoEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// debug-netty-start 使用&运算效率更高
return executors[idx.getAndIncrement() & executors.length - 1];
}
}
private static final class GenericEventExecutorChooser implements EventExecutorChooser {
@Override
public EventExecutor next() {
// debug-netty-start 通过递增取模取正值的方法选择一个EventLoop
return executors[Math.abs(idx.getAndIncrement() % executors.length)];
}
}
复制代码
NioEventLoop的register()方法最后会调用到AbstractChannel中的register()方法。这个方法里会判断当前是否是EventLoop中的线程,如果是直接调用register0()方法,当前还是main()线程,所以将任务加入eventLoop中执行register0()方法,至此initAndRegister()方法结束。
@Override
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
ObjectUtil.checkNotNull(eventLoop, "eventLoop");
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
// debug-netty-start 判断当前线程是不是EventLoop
// debug-netty-connect
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn("Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
复制代码
- eventLoop.execute(……)时将这个任务加入队列,判断当前不是EventLoop线程,先启动EventLoop线程,启动线程之后执行队列中的所有任务,对应步骤5也就是执行register0()方法。
- register0()方法中调用doRegister()方法进行真正的注册操作。将当前channel(NioServerSocketChannel)注册到NioEventLoop绑定的selector上且不关注任何事件;注册完之后再判断当前channel是否打开且绑定端口,此时未绑定。
private void register0(ChannelPromise promise) {
try {
……
boolean firstRegistration = neverRegistered;
// 开始真正注册
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// debug-netty-start 判断当前channel是否打开并且绑定,此时并未绑定
// debug-netty-connect
if (isActive()) {
if (firstRegistration) {
// debug-netty-connect
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
……
}
}
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
// debug-netty-start 将当前channel register到NioEventLoop绑定的selector上,0表示不关注任何事件,this表示当前之channel,后激活设置
// debug-netty-connect
selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException e) {
……
}
}
}
复制代码
- 注册完成之后doBind()方法中会调用doBind0()方法(直接判断注册结果或监听注册结果之后调用)。doBind0()方法将端口绑定封装成一个任务提交至channel对应的eventLoop中。此时evenLoop已经启动(步骤6启动),直接获取任务执行。channel.bind()方法中调用
pipeline.bind(loalAddress, promise)
,pipeline链的第一个pipeline是HeadContext,它的bind方法调用了unsafe.bind(localAddress, promise) 也就是AbstractUnsafe的bind方法,在这个方法中调用了NioServerSocketChannel的doBind()方法根据jdk版本进行真正的端口绑定并根据当前channel的配置设置阻塞模式(这里是非阻塞)。端口绑定完成之后提交了一个任务到eventLoop执行pipeline.fireChannelActive()操作。
private static void doBind0(final ChannelFuture regFuture, final Channel channel,final SocketAddress localAddress, final ChannelPromise promise) {
// debug-netty-start 将绑定端口封装成一个任务通过eventLoop执行
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress,promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
// AbstractUnsafe
@Override
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
……
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
……
}
// debug-netty-start 激活成功,封装一个任务丢给eventLoop执行fireChannelActive操作
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
// NioServerSocketChannel
@Override
protected void doBind(SocketAddress localAddress) throws Exception {
// debug-netty-start 根据jdk版本进行真正的端口绑定,并设置是否阻塞模式
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
复制代码
- evenLoop获取任务执行pipeline.fireChannelActive()方法,首先会调用到HeadContext中channelActive()方法,它调用了readIfIsAutoRead()方法,然后调用到HeadContext.read()方法,最终通过AbstractNioChannel的doBeginRead()方法将channel关注的Selector ops设置为OP_ACCEPT。至此完成启动。
@Override
public void channelActive(ChannelHandlerContext ctx) {
// debug-netty-start 继续向后调用
ctx.fireChannelActive();
readIfIsAutoRead();
}
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
//AbstractNioChannel
@Override
protected void doBeginRead() throws Exception {
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
// debug-netty-start 将channel关注的selector ops设置为OP_ACCEPT
// debug-netty-connect 将channel关注的selector ops设置为OP_READ
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
复制代码
😘感谢您的阅读,有问题欢迎讨论,共同学习。
近期评论