Netty 核心源码解读 —— ServerBootstrap 篇

本文我们就开始对 ServerBootstrap 进行源码解读(4.1.51.Final-SNAPSHOT),为什么是 ServerBootstrap,记得在用 Netty 做第一个项目的时候,写的第一行 Code 就是 new ServerBootstrap()
,ServerBootstrap 是 Netty Server 的启动类,所以从它开始了解 Netty 是最合适的。

ServerBootstrap

## TcpServer.java


private final EventLoopGroup bossGroup = new NioEventLoopGroup(BIZ_GROUP_SIZE); private final EventLoopGroup workerGroup = new NioEventLoopGroup(BIZ_THREAD_SIZE);
public void init() throws Exception { // Server 服务启动 ServerBootstrap bootstrap = new ServerBootstrap();
bootstrap.group(bossGroup, workerGroup); bootstrap.channel(NioServerSocketChannel.class); bootstrap.childHandler(new ServerChannelInitializer(serverConfig)); // 可选参数 bootstrap.childOption(ChannelOption.WRITE_BUFFER_HIGH_WATER_MARK, 32 * 1024);
// 绑定接口,同步等待成功 ChannelFuture future = bootstrap.bind(port).sync(); ChannelFuture channelFuture = future.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) throws Exception { } }); }

这是我在做 TCP 网关时写的 Netty Server 的代码片段(https://github.com/SongranZhang/tcp-gateway/blob/master/src/main/java/com/linkedkeeper/tcp/connector/tcp/server/TcpServer.java),可以看到,Netty Server 的初始化首先是通过 ServerBootstrap 的无参构造函数创建一个对象,接着是这个对象的一串链式调用 bootstrap.group().channel().childHandler().childOption()
,而服务启动的真正触发点是这段  bootstrap.bind(port).sync()
,下面我们就逐一来分析下这里的每个方法。

首先是 group()
方法。

## ServerBootstrap.java


private volatile EventLoopGroup childGroup;
public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup) { super.group(parentGroup); if (this.childGroup != null) { throw new IllegalStateException("childGroup set already"); } this.childGroup = ObjectUtil.checkNotNull(childGroup, "childGroup"); return this; }

这里 workerGroup 赋值给了 ServerBootstrap 的 childGroup,bossGroup 赋值给了父类 AbstractBootstrap 的 group。

## AbstractBootstrap.java


volatile EventLoopGroup group;
public B group(EventLoopGroup group) { ObjectUtil.checkNotNull(group, "group"); if (this.group != null) { throw new IllegalStateException("group set already"); } this.group = group; return self(); }

接下来是 
channel()
 方法。

## AbstractBootstrap.java


private volatile ChannelFactory extends C> channelFactory;
public B channel(Classextends C> channelClass) {
return channelFactory(new ReflectiveChannelFactory( ObjectUtil.checkNotNull(channelClass, "channelClass") )); }
public B channelFactory(ChannelFactory extends C> channelFactory) { ObjectUtil.checkNotNull(channelFactory, "channelFactory"); if (this.channelFactory != null) { throw new IllegalStateException("channelFactory set already"); }
this.channelFactory = channelFactory; return self(); }

这里 NioServerSocketChannel.class
通过 ReflectiveChannelFactory 进行了实例化,然后赋值给了 AbstractBootstrap 的 channelFactory。

接下来是 childHandler()
方法。

## ServerBootstrap.java


private volatile ChannelHandler childHandler;
public ServerBootstrap childHandler(ChannelHandler childHandler) { this.childHandler = ObjectUtil.checkNotNull(childHandler, "childHandler"); return this; }

这里是对 ServerBootstrap 的 childHandler 赋值。

最后是 childOption()
方法。

## ServerBootstrap.java


private final Map, Object> childOptions = new LinkedHashMap, Object>();
public ServerBootstrap childOption(ChannelOption childOption, T value) { ObjectUtil.checkNotNull(childOption, "childOption"); synchronized (childOptions) { if (value == null) { childOptions.remove(childOption); } else { childOptions.put(childOption, value); } } return this; }

这里 childOptions 维护了 TCP 的参数设置。

简言之 bootstrap.group().channel().childHandler().childOption()
就是在构建 Netty Server 的各种参数,下面再来看  bootstrap.bind(port).sync()

首先是 bind()
方法。

## AbstractBootstrap.java


public ChannelFuture bind(int inetPort) { return bind(new InetSocketAddress(inetPort)); }
public ChannelFuture bind(SocketAddress localAddress) { validate(); return doBind(ObjectUtil.checkNotNull(localAddress, "localAddress")); }
public B validate() { if (group == null) { throw new IllegalStateException("group not set"); } if (channelFactory == null) { throw new IllegalStateException("channel or channelFactory not set"); } return self(); }

这里的 
validate()
 方法对 AbstractBootstrap 的 group 和 channelFactory 进行非空校验,之后调用 
doBind()
 方法。

## AbstractBootstrap.java


private ChannelFuture doBind(final SocketAddress localAddress) { final ChannelFuture regFuture = initAndRegister(); final Channel channel = regFuture.channel(); if (regFuture.cause() != null) { return regFuture; }
if (regFuture.isDone()) { // At this point we know that the registration was complete and successful. ChannelPromise promise = channel.newPromise(); doBind0(regFuture, channel, localAddress, promise); return promise; } else { // Registration future is almost always fulfilled already, but just in case it's not. final PendingRegistrationPromise promise = new PendingRegistrationPromise(channel); regFuture.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { Throwable cause = future.cause(); if (cause != null) { // Registration on the EventLoop failed so fail the ChannelPromise directly to not cause an // IllegalStateException once we try to access the EventLoop of the Channel. promise.setFailure(cause); } else { // Registration was successful, so set the correct executor to use. // See https://github.com/netty/netty/issues/2586 promise.registered();
doBind0(regFuture, channel, localAddress, promise); } } }); return promise; } }

首先看一下 
initAndRegister()
 方法。

## AbstractBootstrap.java


final ChannelFuture initAndRegister() { Channel channel = null; try { channel = channelFactory.newChannel(); init(channel); } catch (Throwable t) { if (channel != null) { // channel can be null if newChannel crashed (eg SocketException("too many open files")) channel.unsafe().closeForcibly(); // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(channel, GlobalEventExecutor.INSTANCE).setFailure(t); } // as the Channel is not registered yet we need to force the usage of the GlobalEventExecutor return new DefaultChannelPromise(new FailedChannel(), GlobalEventExecutor.INSTANCE).setFailure(t); }
ChannelFuture regFuture = config().group().register(channel); if (regFuture.cause() != null) { if (channel.isRegistered()) { channel.close(); } else { channel.unsafe().closeForcibly(); } }
// If we are here and the promise is not failed, it's one of the following cases: // 1) If we attempted registration from the event loop, the registration has been completed at this point. // i.e. It's safe to attempt bind() or connect() now because the channel has been registered. // 2) If we attempted registration from the other thread, the registration request has been successfully // added to the event loop's task queue for later execution. // i.e. It's safe to attempt bind() or connect() now: // because bind() or connect() will be executed *after* the scheduled registration task is executed // because register(), bind(), and connect() are all bound to the same thread.
return regFuture; }

这里 
channelFactory.newChannel()
 调用的是 ReflectiveChannelFactory 的 newChannel 方法。

## ReflectiveChannelFactory


private final Constructorextends T> constructor;
public ReflectiveChannelFactory(Classextends T> clazz) {
ObjectUtil.checkNotNull(clazz, "clazz"); try { this.constructor = clazz.getConstructor(); } catch (NoSuchMethodException e) { throw new IllegalArgumentException("Class " + StringUtil.simpleClassName(clazz) + " does not have a public non-arg constructor", e); } }
public T newChannel() { try { return constructor.newInstance(); } catch (Throwable t) { throw new ChannelException("Unable to create Channel from class " + constructor.getDeclaringClass(), t); } }

这里 
constructor.newInstance()
 是 
NioServerSocketChannel.class
 的一个实例。得到 channel 后,调用 
init(channel)
 进行初始化,一是给 options 和 attrs 赋值,二是构建 pipeline。

## ServerBootstrap.java


void init(Channel channel) { setChannelOptions(channel, newOptionsArray(), logger); setAttributes(channel, attrs0().entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY));
ChannelPipeline p = channel.pipeline();
final EventLoopGroup currentChildGroup = childGroup; final ChannelHandler currentChildHandler = childHandler; final Entry>, Object>[] currentChildOptions; synchronized (childOptions) { currentChildOptions = childOptions.entrySet().toArray(EMPTY_OPTION_ARRAY); } final Entry>, Object>[] currentChildAttrs = childAttrs.entrySet().toArray(EMPTY_ATTRIBUTE_ARRAY);
p.addLast(new ChannelInitializer() { @Override public void initChannel(final Channel ch) { final ChannelPipeline pipeline = ch.pipeline(); ChannelHandler handler = config.handler(); if (handler != null) { pipeline.addLast(handler); }
ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); } }); }

回到 
initAndRegister()
 方法中,
init(channel)
 之后是 
register(channel)
,该方法在 NioEventLoopGroup 的父类 MultithreadEventLoopGroup 中实现,我们在解读 NioEventLoop 源码时再分析。

## MultithreadEventLoopGroup


public ChannelFuture register(Channel channel) { return next().register(channel); }

看完 
initAndRegister()
,再回到 
doBind()
 接着看 
doBind0()

## AbstractBootstrap


private static void doBind0( final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up // the pipeline in its channelRegistered() implementation. channel.eventLoop().execute(new Runnable() { @Override public void run() { if (regFuture.isSuccess()) { channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { promise.setFailure(regFuture.cause()); } } }); }

这里 regFuture.isSuccess()
会执行  channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
,否者执行  promise.setFailure(regFuture.cause());
,这里的 promise 可以认为是一种特殊的 Future 对象。bind 是在 ChannelPipeline 里进行绑定的,我们在解读 ChannelPipeline 源码时再分析。

最后看一下 bootstrap.bind(serverPort).sync()
中的  sync()
bootstrap.bind(serverPort)
返回的是 ChannelFuture,所以  sync()
是调用 DefaultChannelPromise 的方法。

## DefaultChannelPromise


public ChannelPromise sync() throws InterruptedException { super.sync(); return this; }

这里 
super.sync();
 调用了父类的方法。

## DefaultPromise


public Promise sync() throws InterruptedException { await(); rethrowIfFailed(); return this; }
public Promise await() throws InterruptedException { if (isDone()) { return this; }
if (Thread.interrupted()) { throw new InterruptedException(toString()); }
checkDeadLock();
synchronized (this) { while (!isDone()) { incWaiters(); try { wait(); } finally { decWaiters(); } } } return this; }

这里 while(!isDone())
会进入循环,调用  sync()
后线程会被阻塞住。

总结

本篇也是写了好久,本文介绍了 ServerBootstrap,它是构建 Netty Server 的主要实现类,ServerBootstrap 里主要是对各种属性进行赋值,并创建 Channel 和 ChannelPipeline,最后绑定本地端口开始监听 IO 事件。在后续的文章里,我会继续与大家讨论 Netty 的 EventLoop,还请大家多多关注我的个人博客或公账号。