189 8069 5689

Netty是如何绑定端口和启动服务的

本篇内容介绍了“Netty是如何绑定端口和启动服务的”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

阿拉山口ssl适用于网站、小程序/APP、API接口等需要进行数据传输应用场景,ssl证书未来市场广阔!成为创新互联公司的ssl证书销售渠道,可以享受市场价格4-6折优惠!如果有意向欢迎电话联系或者加微信:028-86922220(备注:SSL证书合作)期待与您的合作!

服务端启动DEMO

      先从一个简单的服务端启动`DEMO`开始,以下是一个标准的`Netty`服务端代码

public final class NettyServer {public static void main(String[] args) throws Exception {        EventLoopGroup bossGroup = new NioEventLoopGroup();        EventLoopGroup workerGroup = new NioEventLoopGroup();try {            ServerBootstrap serverBootstrap = new ServerBootstrap();            serverBootstrap.group(bossGroup, workerGroup)                    .channel(NioServerSocketChannel.class)                    .childHandler(new ChannelInitializer() {@Overridepublic void initChannel(SocketChannel channel) {                            ChannelPipeline channelPipeline = channel.pipeline();                            channelPipeline.addLast("decoder", new StringDecoder());                            channelPipeline.addLast("encoder", new StringEncoder());                            channelPipeline.addLast("handler", new ServerHandler());                        }                    });            ChannelFuture channelFuture = serverBootstrap.bind(8888).sync();            channelFuture.channel().closeFuture().sync();        } finally {            bossGroup.shutdownGracefully();            workerGroup.shutdownGracefully();        }    }}br

注:ServerBootstrap.childHandler()用于指定处理新连接数据的读写处理逻辑,同时ServerBootstrap还提供handler()用于指定在服务端启动过程中的一些逻辑,通常情况下我们用不着这个方法

`ServerHandler`代码如下:

public class ServerHandler extends ChannelInboundHandlerAdapter {    @Override    public void channelActive(ChannelHandlerContext ctx) {        System.out.println("channelActive");    }    @Override    public void channelRegistered(ChannelHandlerContext ctx) {        System.out.println("channelRegistered");    }    @Override    public void handlerAdded(ChannelHandlerContext ctx) {        System.out.println("handlerAdded");    }    @Override    public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelReadComplete");    }    @Override    public void channelInactive(ChannelHandlerContext ctx) throws Exception {        System.out.println("channelInactive");    }    @Override    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {        System.out.println("service receive msg:" + msg);    }}br

 当有新连接接入时,控制台打印出

reStructuredTexthandlerAddedchannelRegisteredchannelActivebr

     但接收到新消息时,控制台打印出

reStructuredTextservice receive msg:xxxchannelReadCompletebr

       本文主要分析服务端的启动过程,而新连接接入 新消息的读取会在后续章节中说明

服务端启动源码分析

`ServerBootstrap`是`Netty`为方便开发者使用而设计的一个启动类,其核心代码入口在`bind()`,代码如下

public ChannelFuture bind(int inetPort) {return bind(new InetSocketAddress(inetPort));}//通过端口号创建一个`InetSocketAddress`,然后继续调用重载的`bind()`public ChannelFuture bind(SocketAddress localAddress) {// ...return doBind(localAddress);}br

由于博客篇幅有限所以有些健壮性分支会以`// ...`略过,健壮性分支不会影响对`Netty`主流程的理解。

`Netty`服务端启动最后会调用到`doBind()`方法,代码如下

private ChannelFuture doBind(final SocketAddress localAddress) {//...final ChannelFuture regFuture = initAndRegister();//...final Channel channel = regFuture.channel();//...    doBind0(regFuture, channel, localAddress, promise);//...return promise;br

在`doBind()`中我们关注两个核心方法`initAndRegister()`以及`doBind0()`

服务端`Channel`创建

final ChannelFuture initAndRegister() {    Channel channel = null;// 新建Channel    channel = channelFactory.newChannel();// 初始化Channelinit(channel);// 将这个Channel Register到某个对象    ChannelFuture regFuture = config().group().register(channel);return regFuture;}br

`Channel`是`Netty`的核心概念之一,它是`Netty`网络通信的主体由它负责同对端进行网络通信、注册和数据操作等功能。

`Channel`的创建是由`channelFactory.newChannel()`完成的,接下来跟踪`channelFactory`是在何时被初始化,我们层层回溯最终发现是在这个函数中

public B channel(Class channelClass) {if (channelClass == null) {throw new NullPointerException("channelClass");    }return channelFactory(new ReflectiveChannelFactory(channelClass));}br

在`Demo`程序调用`.channel()`方法并传入`NioServerSocketChannel.class`

public class ReflectiveChannelFactory implements ChannelFactory {private final Class clazz;public ReflectiveChannelFactory(Class clazz) {if (clazz == null) {throw new NullPointerException("clazz");        }this.clazz = clazz;    }@Overridepublic T newChannel() {try {return clazz.newInstance();        } catch (Throwable t) {throw new ChannelException("Unable to create Channel from class " + clazz, t);        }    }}br

即在`Netty`服务端启动的时候通过反射方式(调用默认构造函数)来创建一个`NioServerSocketChannel`对象,

加下来我们继续跟进`NioServerSocketChannel`的默认构造函数

public NioServerSocketChannel() {this(newSocket(DEFAULT_SELECTOR_PROVIDER));}private static ServerSocketChannel newSocket(SelectorProvider provider) {try {// 利用SelectorProvider产生一个ServerSocketChannel对象return provider.openServerSocketChannel();    } catch (IOException e) {throw new ChannelException("Failed to open a server socket.", e);    }}br

通过`newSocket(DEFAULT_SELECTOR_PROVIDER)`创建一条`server`端`channel`,然后进入到以下方法

public NioServerSocketChannel(ServerSocketChannel channel) {super(null, channel, SelectionKey.OP_ACCEPT);    config = new NioServerSocketChannelConfig(this, javaChannel().socket());}br

该方法主要完成两个功能,首先是调用父类的构造方法然后初始化`NioServerSocketChannelConfig`属性

protected AbstractNioMessageChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent, ch, readInterestOp);}protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {super(parent);this.ch = ch;// 设置SelectionKey.OP_ACCEPT事件this.readInterestOp = readInterestOp;// 设置ServerSocketChannel为非阻塞的    ch.configureBlocking(false);}br

这里将前面通过`provider.openServerSocketChannel()`创建出来的`ServerSocketChannel`保存到成员变量,然后调用将该`channel`为非阻塞模式,这是个标准的`JDK NIO`编程的玩法。这里的`readInterestOp`即前面层层传入的`SelectionKey.OP_ACCEPT`,接下来继续跟进`super(parent)`

protected AbstractChannel(Channel parent) {this.parent = parent;    id = newId();unsafe = newUnsafe();    pipeline = newChannelPipeline();}br

在`AbstractChannel`的构造方法中主要是初始化了`id`,`unsafe`,`pipeline`属性

 服务端`Channel`初始化

在创建完`Channel`后,我们在`init`方法中对`Channel`进行初始化操作,代码如下

void init(Channel channel) throws Exception {// 给channel设置optionfinal Map, Object> options = options0();synchronized (options) {        channel.config().setOptions(options);    }// 给channel设置attrfinal Map, Object> attrs = attrs0();synchronized (attrs) {for (Entry, Object> e: attrs.entrySet()) {            AttributeKey key = (AttributeKey) e.getKey();            channel.attr(key).set(e.getValue());        }    }    ChannelPipeline p = channel.pipeline();final EventLoopGroup currentChildGroup = childGroup;final ChannelHandler currentChildHandler = childHandler;final Entry, Object>[] currentChildOptions;final Entry, Object>[] currentChildAttrs;synchronized (childOptions) {// 设置新接入channel的options        currentChildOptions = childOptions.entrySet().toArray(newOptionArray(childOptions.size()));    }synchronized (childAttrs) {// 设置新接入channel的attrs        currentChildAttrs = childAttrs.entrySet().toArray(newAttrArray(childAttrs.size()));    }    p.addLast(new ChannelInitializer() {@Overridepublic void initChannel(Channel ch) throws Exception {final ChannelPipeline pipeline = ch.pipeline();// 设置handler到pipeline上// 这里的handler()返回的就是.handler()所设置的值            ChannelHandler handler = config.handler();if (handler != null) {                pipeline.addLast(handler);            }            ch.eventLoop().execute(new Runnable() {@Overridepublic void run() {// p.addLast()向serverChannel的流水线处理器中加入了一个ServerBootstrapAcceptor// 从名字上就可以看出来这是一个接入器,专门接受新请求,把新的请求扔给某个事件循环器                    pipeline.addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));                }            });        }    });}br 

以上代码主要完成如下功能:

  • 为`channel`设置`option`及`attr`
  • 初始化服务端`channel`的`pipeline`
  • 添加自定义`handler`
  • 添加`ServerBootstrapAcceptor`(请求接入器)
`NioEventLoop.execute()`方法为`Netty Reactor`线程执行的入口,关于`Netty Reactor`线程我们将在下一篇中介绍。我们总结一下发现代码执行到这里`Netty`并未真正启动服务,只是初始化了一些基本的配置和属性以及在`pipeline`上加入了一个接入器用来专门接受新连接
 将`Channel`注册到事件轮询器
完成`Channel`注册的代码如下
ChannelFuture regFuture = config().group().register(channel);```此处`config().group()`返回的对象为`NioEventLoopGroup````java@Overridepublic ChannelFuture register(Channel channel) {// 调用了NioEventLoop对象中的register方法// NioEventLoop extends SingleThreadEventLoopreturn next().register(channel);}br
在`next`方法中返回一个`EventLoop`对象,每一个`EventLoop`都与一个`selector`绑定
在之前的代码中`EventLoop`中的`Selector`一直没有任何`Channel`注册,所以每次`select`操作都是空,但从这行代码开始这个`selector`中开始有`Channel`注册
public ChannelFuture register(Channel channel) {    return register(new DefaultChannelPromise(channel, this));}public ChannelFuture register(final ChannelPromise promise) {    ObjectUtil.checkNotNull(promise, "promise");    promise.channel().unsafe().register(this, promise);    return promise;}br
这里可以看到`register`操作是委托给`Channel`中的`Unsafe`对象来执行的,这里的`Unsafe`对象对上文稍有印象的同学应该能知道这个就是创建`NioServerSocketChannel`的时候创建的`Unsafe`对象
public final void register(EventLoop eventLoop, final ChannelPromise promise) {// ...    AbstractChannel.this.eventLoop = eventLoop;// ...    register0(promise);}br
先将`EventLoop`事件循环器绑定到该`NioServerSocketChannel`上,然后调用`register0()`代码如下
 
private void register0(ChannelPromise promise) {    try {        boolean firstRegistration = neverRegistered;        doRegister();        neverRegistered = false;        registered = true;        pipeline.invokeHandlerAddedIfNeeded();        safeSetSuccess(promise);        pipeline.fireChannelRegistered();        if (isActive()) {            if (firstRegistration) {                pipeline.fireChannelActive();            } else if (config().isAutoRead()) {                beginRead();            }        }    } catch (Throwable t) {        closeForcibly();        closeFuture.setClosed();        safeSetFailure(promise, t);    }}//这一段其实也很清晰,先调用`doRegister()`进行注册protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);            return;        } catch (CancelledKeyException e) {            // ...        }    }}br
在这里我们终于看到`JDK`底层的`Channel`注册到`Selector`的过程,但是这里的`OPS`为0即不关心任何事件,而我们期望`OPS`的值为`SelectionKey.OP_ACCEPT`,所以到了这里代码还没有结束。在执行完`Channel`注册后接着执行了几个`pipeline`相关的方法,我们后面详细剖析`pipeline`的时候再讲
服务端`Channel`端口绑定
private static void doBind0(final ChannelFuture regFuture, final Channel channel, final SocketAddress localAddress, final ChannelPromise promise) {    channel.eventLoop().execute(new Runnable() {@Overridepublic void run() {if (regFuture.isSuccess()) {                channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);            } else {                promise.setFailure(regFuture.cause());            }        }    });}//在`dobind0()`方法中通过`EventLoop`执行一个任务,接下来我们进入到`channel.bind()`方法public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return pipeline.bind(localAddress, promise);}public final ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {return tail.bind(localAddress, promise);}br
       关于`Pipeline`相关的内容将在后续博客中介绍,当前一个比较好的方式就是`Debug`单步进入。
      跟踪调用链最后我们来到了`DefaultChannelPipeline.HeadContext`的`bind()`,代码如下
public void bind(ChannelHandlerContext ctx, SocketAddress localAddress, ChannelPromise promise) throws Exception {unsafe.bind(localAddress, promise);}//这里的`unsafe`就是前面提到的`AbstractUnsafe`, 准确点应该是`NioMessageUnsafe`@Overridepublic final void bind(final SocketAddress localAddress, final ChannelPromise promise) {// ...    boolean wasActive = isActive();// ...    doBind(localAddress);if (!wasActive && isActive()) {        invokeLater(new Runnable() {            @Overridepublic void run() {                pipeline.fireChannelActive();            }        });    }    safeSetSuccess(promise);}//在`doBind`方法中完成绑定操作,代码如下protected void doBind(SocketAddress localAddress) throws Exception {if (PlatformDependent.javaVersion() >= 7) {        javaChannel().bind(localAddress, config.getBacklog());    } else {        javaChannel().socket().bind(localAddress, config.getBacklog());    }}br

       最终调用到了`JDK`里面的`bind`方法真正进行了端口的绑定。按照正常流程我们前面已经分析到`isActive()`方法返回`false`,进入到`doBind()`之后如果`channel`被激活了,就发起`pipeline.fireChannelActive()`调用

public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}//`pipeline.channelActive`会逐一调用`pipeline`中每一个节点的`channelActive`方法,所以`HeadContext`的`channelActive`将会被调用,即`readIfIsAutoRead`方法将会被调用public void channelActive(ChannelHandlerContext ctx) throws Exception {    ctx.fireChannelActive();    readIfIsAutoRead();}private void readIfIsAutoRead() {if (channel.config().isAutoRead()) {        channel.read();    }}//最终这个方法会调用到`AbstractNioChannel`的`doBeginRead`方法protected void doBeginRead() throws Exception {final SelectionKey selectionKey = this.selectionKey;if (!selectionKey.isValid()) {return;    }    readPending = true;final int interestOps = selectionKey.interestOps();if ((interestOps & readInterestOp) == 0) {        selectionKey.interestOps(interestOps | readInterestOp);    }}br
     在最后一行中的`readInterestOp`
    即在上文中提到的`SelectionKey.OP_ACCEPT`,即让事件轮询器关注`Accept`事件,至此完成了`Channel`对`ACCEPT`事件的注册过程
总结
      到目前为止我们看到的代码相当于传统NIO编程中的如下代码
ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();// 建NioServerSocketChannelserverSocketChannel.configureBlocking(false);//AbstractNioChannel中ch.configureBlocking(false);serverSocketChannel.bind(new InetSocketAddress("localhost", 8888));// NioServerSocketChannel.doBind()Selector selector = Selector.open();// NioEventLoop.openSelector()serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT) //  AbstractNioChannel.doBeginRead()br
      服务端启动完成的主要功能为创建一个`Channel`,并且将`Channel`注册到`NioEventLoop`的`Selector`上.

“Netty是如何绑定端口和启动服务的”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注创新互联网站,小编将为大家输出更多高质量的实用文章!


文章标题:Netty是如何绑定端口和启动服务的
文章位置:http://gzruizhi.cn/article/iggegd.html