Channel是Netty的网络抽象类,聚合了很多的功能,包括网络读、写,客户端发起连接,主动关闭连接,链路关闭,获取通讯双方的网络地址等。也包括了Netty框架自身的一些功能,如获取该Channel的EventLoop,获取缓冲分配器ByteBufferAllocator和pipeline等等。
以上是Channel接口提供的一些方法,很多比较重要,所以这里简单的进行介绍:
io.netty.channel.ChannelInboundHandler#channelRead
方法,读取操作调用完成之后,会调用io.netty.channel.ChannelInboundHandler#channelReadComplete
private DefaultChannelId() {
this.data = new byte[MACHINE_ID.length + 4 + 4 + 8 + 4];
int i = 0;
System.arraycopy(MACHINE_ID, 0, this.data, i, MACHINE_ID.length);
int i = i + MACHINE_ID.length;
i = this.writeInt(i, PROCESS_ID);
i = this.writeInt(i, nextSequence.getAndIncrement());
i = this.writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
int random = PlatformDependent.threadLocalRandom().nextInt();
i = this.writeInt(i, random);
assert i == this.data.length;
this.hashCode = Arrays.hashCode(this.data);
}
这里分别查看服务端的NioServerSocketChannel和客户端的NioSocketChannel
可以看到,除了前面定义的一些异常之外,就是对Channel的一些方法对应的变量,所以这里不做过多讲解。
比较重要的是:
(1) Channel的注册
protected void doRegister() throws Exception {
boolean selected = false;
while(true) {
try {
this.selectionKey = this.javaChannel().register(this.eventLoop().unwrappedSelector(), 0, this);
return;
} catch (CancelledKeyException var3) {
if (selected) {
throw var3;
}
this.eventLoop().selectNow();
selected = true;
}
}
}
这里定义了一个boolean的变量selected代表是否注册成功,然后循环调用SelectableChannel的register方法进行注册,直到成功为止。
注册的时候需要指定监听的网络操作位来表示Channel对哪些网络事件感兴趣,网络事事件定义在SelectionKey里面,定义如下:
我们看到,这里面传入的是0,表示不对任何事件感兴趣,仅仅是完成事件的注册。注册时候可以携带附件(这里携带的是当前Channel实例),后续Channel接收到之后,可以重新获取到之前的附件信息进行处理。
就一个flushTask,用来负责继续写半包消息。
(1) doWrite方法
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
int writeSpinCount = this.config().getWriteSpinCount();
do {
Object msg = in.current(); //①
if (msg == null) { //②
this.clearOpWrite();
return;
}
writeSpinCount -= this.doWriteInternal(in, msg); //③
} while(writeSpinCount > 0); //④
this.incompleteWrite(writeSpinCount < 0);
}
这里会从循环消息队列中弹出一条消息 ①。判断是否为null。
如果为空,则证明数组中所有等待发送的消息全部发送完毕,就清空半包标志 ②,并且退出循环。清空半包标志代码如下:
protected final void clearOpWrite() {
SelectionKey key = this.selectionKey();
if (key.isValid()) {
int interestOps = key.interestOps(); // ⑤
if ((interestOps & 4) != 0) { //⑥
key.interestOps(interestOps & -5); //⑦
}
}
}
从当前的SelectionKey中获取网路操作位 ⑤,然后与Selection_OP_WRITE(4)做按位与 ⑥,如果结果不为0,证明SelectionKey是isWritable的,那么就需要进行清除写操作位,清除的方法很简单,就是对操作位和 ~Selection_OP_WRITE(-5)进行按位与运算 ⑦,说白了,就是把写操作位置成0。
如果不为空,进行写操作 ③。查看doWriteInternal方法可以发现,写出的是ByteBuf或者是FileRegion,这里不做过多赘述
比较重要的也是doWrite方法,如下:
protected void doWrite(ChannelOutboundBuffer in) throws Exception {
SelectionKey key = this.selectionKey();
int interestOps = key.interestOps();
while(true) {
Object msg = in.current();
if (msg == null) {
if ((interestOps & 4) != 0) {
key.interestOps(interestOps & -5);
}
break;
}
try {
boolean done = false;
for(int i = this.config().getWriteSpinCount() - 1; i >= 0; --i) {
if (this.doWriteMessage(msg, in)) {
done = true;
break;
}
}
if (!done) {
if ((interestOps & 4) == 0) {
key.interestOps(interestOps | 4);
}
break;
}
in.remove();
} catch (Exception var7) {
if (!this.continueOnWriteError()) {
throw var7;
}
in.remove(var7);
}
}
}
其实核心逻辑和AbstractNioByteChannel类似,不同的地方在于写出的是POJO对象,而不是ByteBuf和FieldRegion,具体由子类对doWriteMessage的实现来决定。
(1)打开ServerSocketChannel通道
private static java.nio.channels.ServerSocketChannel newSocket(SelectorProvider provider) {
try {
return provider.openServerSocketChannel(); // ①
} catch (IOException var2) {
throw new ChannelException("Failed to open a server socket.", var2);
}
}
public class KQueueSelectorProvider extends SelectorProviderImpl {
public KQueueSelectorProvider() {
}
public AbstractSelector openSelector() throws IOException {
return new KQueueSelectorImpl(this);
}
}
public abstract class SelectorProviderImpl extends SelectorProvider {
......
public ServerSocketChannel openServerSocketChannel() throws IOException {
return new ServerSocketChannelImpl(this); //这是NIO的ServerSocket实现
}
......
}
(2)读取数据
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = SocketUtils.accept(this.javaChannel()); //①
try {
if (ch != null) { //
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable var6) {
logger.warn("Failed to create a new channel from an accepted socket.", var6);
try {
ch.close();
} catch (Throwable var5) {
logger.warn("Failed to close a socket.", var5);
}
}
return 0;
}
List<Object>
中(在NIOServerSocketChannel中到这里就算完成了);Unsafe实际上Channel接口的辅助接口,它不应该被用户代码直接调用,而不是说他不安全。实际上Channel接口的很多操作都是由它来实现的。