Netty-9 Channel和UnSafe

(117) 2024-05-18 08:01:01

一、Channel简介

Channel是Netty的网络抽象类,聚合了很多的功能,包括网络读、写,客户端发起连接,主动关闭连接,链路关闭,获取通讯双方的网络地址等。也包括了Netty框架自身的一些功能,如获取该Channel的EventLoop,获取缓冲分配器ByteBufferAllocator和pipeline等等。

二、Channel功能介绍

Netty-9 Channel和UnSafe (https://mushiming.com/)  第1张
以上是Channel接口提供的一些方法,很多比较重要,所以这里简单的进行介绍:

  1. alloc():获取当前Channel使用的ByteBuf缓冲区分配器
  2. read():从Channel中读取数据,如果数据被读取成功,那么就会触发io.netty.channel.ChannelInboundHandler#channelRead方法,读取操作调用完成之后,会调用io.netty.channel.ChannelInboundHandler#channelReadComplete
  3. flush():将之前写入到唤醒消息数组的数据全部写到目标Channel中;
  4. disconnect(ChannelPromise var1):请求断开远程的连接并使用ChannelPromise来处理操作的结果
  5. bind(SocketAddress localAddress):绑定指定的本地Socket地址
  6. config():获取当前Channel的配置信息
  7. isOpen():判断Channel是否已经打开
  8. isRegistered():判断当前Channel是否已经注册到EventLoop上;
  9. isActive():是否已经处于激活状态
  10. metadata():获取当前Channel的元数据信息,包括TCP参数配置等等
  11. localAddress():获取当前Channel绑定的本地地址;
  12. remoteAddress():获取当前Channel绑定的远程地址;
  13. eventLoop():获取当前Channel注册到的EventLoop,本质上是Reactor线程组;
  14. parent():对于服务端来说,会返回空,对于客户端来说,会返回创建他的ServerSocketChannel;
  15. id():返回一个ChannelId对象,是Channel的唯一标志,生成策略如下:
  • 机器的MAC地址,可以代表全局唯一的信息;
  • 当前的进程ID;
  • 当前系统时间的毫秒数;
  • 当前系统时间纳秒数;
  • 32位整形随机数;
  • 32位自增的序列数;
    参看DefaultChannelId原代码:
  private DefaultChannelId() {
    this.data = new byte[MACHINE_ID.length + 4 + 4 + 8 + 4];
    int i = 0;
    System.arraycopy(MACHINE_ID, 0, this.data, i, MACHINE_ID.length);
    int i = i + MACHINE_ID.length;
    i = this.writeInt(i, PROCESS_ID);
    i = this.writeInt(i, nextSequence.getAndIncrement());
    i = this.writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
    int random = PlatformDependent.threadLocalRandom().nextInt();
    i = this.writeInt(i, random);
    assert i == this.data.length;
    this.hashCode = Arrays.hashCode(this.data);
  }

三、Channel继承体系

这里分别查看服务端的NioServerSocketChannel和客户端的NioSocketChannel

(一)NIOServerSocketChannel继承体系:

Netty-9 Channel和UnSafe (https://mushiming.com/)  第2张

(二)NioSocketChannel继承体系:

Netty-9 Channel和UnSafe (https://mushiming.com/)  第3张

(三)AbstractChannel

1. 成员变量

Netty-9 Channel和UnSafe (https://mushiming.com/)  第4张
可以看到,除了前面定义的一些异常之外,就是对Channel的一些方法对应的变量,所以这里不做过多讲解。

2. 成员方法

比较重要的是:

  1. write(Object msg):写出数据,不过并没有直接写出,而是放到了环形缓冲区,当调用了flush方法的时候,才会真正的写出
  2. writeAndFlush(Object msg):写出数据,同时flush
  3. connect:建立连接

(二)AbstractNioChannel

1. 成员变量

  1. protected final int readInterestOp:代表的是SelectionKey的OP_READ;
  2. volatile SelectionKey selectionKey:注册到EventLoop返回的选择键,由于Channel会面临并发写情况,需要修改selectionKey,需要让其他线程知道,所以使用volatile修饰;
  3. private ChannelPromise connectPromise:处理注册结果;
  4. ScheduledFuture<?> connectTimeoutFuture:连接超时定时器

2. 核心API源码

(1) Channel的注册

  protected void doRegister() throws Exception {
    boolean selected = false;

    while(true) {
      try {
        this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
        return;
      } catch (CancelledKeyException var3) {
        if (selected) {
          throw var3;
        }

        this.eventLoop().selectNow();
        selected = true;
      }
    }
  }

这里定义了一个boolean的变量selected代表是否注册成功,然后循环调用SelectableChannel的register方法进行注册,直到成功为止。
注册的时候需要指定监听的网络操作位来表示Channel对哪些网络事件感兴趣,网络事事件定义在SelectionKey里面,定义如下:

  • public static final int OP_READ = 1 << 0:读操作位
  • public static final int OP_WRITE = 1 << 2:写操作位
  • public static final int OP_CONNECT = 1 << 3:客户端连接服务器操作位
  • public static final int OP_ACCEPT = 1 << 4:服务端接受客户端操作连接为

我们看到,这里面传入的是0,表示不对任何事件感兴趣,仅仅是完成事件的注册。注册时候可以携带附件(这里携带的是当前Channel实例),后续Channel接收到之后,可以重新获取到之前的附件信息进行处理。

(三)AbstractNioByteChannel

1. 成员变量

就一个flushTask,用来负责继续写半包消息。

2. 核心API源码

(1) doWrite方法

  protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = this.config().getWriteSpinCount();

    do {
      Object msg = in.current(); //①
      if (msg == null) {  //②
        this.clearOpWrite();
        return;
      }
      writeSpinCount -= this.doWriteInternal(in, msg); //③
    } while(writeSpinCount > 0); //④
    this.incompleteWrite(writeSpinCount < 0);
  }

这里会从循环消息队列中弹出一条消息 。判断是否为null。
如果为空,则证明数组中所有等待发送的消息全部发送完毕,就清空半包标志 ,并且退出循环。清空半包标志代码如下:

  protected final void clearOpWrite() {
    SelectionKey key = this.selectionKey();
    if (key.isValid()) {
      int interestOps = key.interestOps(); // ⑤
      if ((interestOps & 4) != 0) { //⑥
        key.interestOps(interestOps & -5); //⑦
      }
    }
  }

从当前的SelectionKey中获取网路操作位 ,然后与Selection_OP_WRITE(4)做按位与 ,如果结果不为0,证明SelectionKey是isWritable的,那么就需要进行清除写操作位,清除的方法很简单,就是对操作位和 ~Selection_OP_WRITE(-5)进行按位与运算 ,说白了,就是把写操作位置成0。

如果不为空,进行写操作 。查看doWriteInternal方法可以发现,写出的是ByteBuf或者是FileRegion,这里不做过多赘述

(四)AbstractNioMessageChannel

1. 成员变量

  • inputShutdown:用来表示输入是否已经关闭

2. 核心API解析

比较重要的也是doWrite方法,如下:

protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    SelectionKey key = this.selectionKey();
    int interestOps = key.interestOps();

    while(true) {
      Object msg = in.current();
      if (msg == null) {
        if ((interestOps & 4) != 0) {
          key.interestOps(interestOps & -5);
        }
        break;
      }

      try {
        boolean done = false;

        for(int i = this.config().getWriteSpinCount() - 1; i >= 0; --i) {
          if (this.doWriteMessage(msg, in)) {
            done = true;
            break;
          }
        }

        if (!done) {
          if ((interestOps & 4) == 0) {
            key.interestOps(interestOps | 4);
          }
          break;
        }

        in.remove();
      } catch (Exception var7) {
        if (!this.continueOnWriteError()) {
          throw var7;
        }

        in.remove(var7);
      }
    }

  }

其实核心逻辑和AbstractNioByteChannel类似,不同的地方在于写出的是POJO对象,而不是ByteBuf和FieldRegion,具体由子类对doWriteMessage的实现来决定。

(五)NioServerSocketChannel

1. 成员变量

  • ChannelMetadata METADATA:元数据信息;
  • ServerSocketChannelConfig config:用于配置TCP参数的配置对象;

2. 核心API解析

(1)打开ServerSocketChannel通道

  private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
    try {
      return provider.openServerSocketChannel(); // ①
    } catch (IOException var2) {
      throw new ChannelException("Failed to open a server socket.", var2);
    }
  }
  • 默认情况下①处使用的provider是KQueueSelectorProvider
public class KQueueSelectorProvider extends SelectorProviderImpl {
  public KQueueSelectorProvider() {
  }

  public AbstractSelector openSelector() throws IOException {
    return new KQueueSelectorImpl(this);  
  }
}
  • 我们发现,并没有openServerSocketChannel方法,所以实际上是在父类SelectorProviderImpl中
public abstract class SelectorProviderImpl extends SelectorProvider {
  ......
  public ServerSocketChannel openServerSocketChannel() throws IOException {
    return new ServerSocketChannelImpl(this); //这是NIO的ServerSocket实现
  }
  ......
}

(2)读取数据

protected int doReadMessages(List<Object> buf) throws Exception {
    SocketChannel ch = SocketUtils.accept(this.javaChannel()); //①	

    try {
      if (ch != null) { //
        buf.add(new NioSocketChannel(this, ch));
        return 1;
      }
    } catch (Throwable var6) {
      logger.warn("Failed to create a new channel from an accepted socket.", var6);

      try {
        ch.close();
      } catch (Throwable var5) {
        logger.warn("Failed to close a socket.", var5);
      }
    }

    return 0;
  }
  • 接受客户端的SocketChannel
  • 如果不为空,则利用接收到的SocketChannel创建NioSocketChannel;
  • 把创建好的NioSocketChannel添加到List<Object>中(在NIOServerSocketChannel中到这里就算完成了);
  • 返回1,表示消息读取成功;
  • 也就是说,NioServerSocketChannel的读取操作就是接受客户端的连接,创建NioSocketChannel对象。

(六)NioSocketChannel

1. 成员变量

  • SocketChannelConfig config : 客户端TCP配置实例

2. 核心API解析


四、Unsafe介绍

Unsafe实际上Channel接口的辅助接口,它不应该被用户代码直接调用,而不是说他不安全。实际上Channel接口的很多操作都是由它来实现的。
Netty-9 Channel和UnSafe (https://mushiming.com/)  第5张

五、Unsafe接口继承体系

Netty-9 Channel和UnSafe (https://mushiming.com/)  第6张

THE END

发表回复