Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的。
一、Unsafe继承关系图
二、AbstractUnsafe源码分析
1. register方法
register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法。
public final void register(final ChannelPromise promise) { // 当前线程是否为Channel对应的NioEventLoop线程 if (eventLoop.inEventLoop()) { // 如果是,则不存在多线程并发操作,直接注册 register0(promise); } else { // 如果不是,说明是其他线程或用户线程发起的注册,存在并发操作,将其放进NioEventLoop任务队列中执行 try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); promise.setFailure(t); } }}private void register0(ChannelPromise promise) { try { // 判断Channel是否打开了 if (!ensureOpen(promise)) { return; } // 调用AbstractNioChannel的doRegister方法。请见 Netty源码分析-Channel doRegister(); registered = true; promise.setSuccess(); // 注册成功 pipeline.fireChannelRegistered(); if (isActive()) { // Channel被激活 pipeline.fireChannelActive(); } } catch (Throwable t) { // Close the channel directly to avoid FD leak. closeForcibly(); closeFuture.setClosed(); if (!promise.tryFailure(t)) { logger.warn( "Tried to fail the registration promise, but it is complete already. " + "Swallowing the cause of the registration failure:", t); } }}// AbstractNioChannel.doRegister()protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { // Force the Selector to select now as the "canceled" SelectionKey may still be // cached and not removed because no Select.select(..) operation was called yet. eventLoop().selectNow(); selected = true; } else { // We forced a select operation on the selector before but the SelectionKey is still cached // for whatever reason. JDK bug ? throw e; } } }}
2. bind方法
bind方法主要用于绑定指定端口。对于服务端,用于绑定监听端口,并设置backlog参数;对于客户端,用于指定客户端Channel的本地绑定Socket地址。
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) { if (!ensureOpen(promise)) { return; } // See: https://github.com/netty/netty/issues/576 if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() && Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) && localAddress instanceof InetSocketAddress && !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) { // Warn a user about the fact that a non-root user can't receive a // broadcast packet on *nix if the socket is bound on non-wildcard address. logger.warn( "A non-root user can't receive a broadcast packet if the socket " + "is not bound to a wildcard address; binding to a non-wildcard " + "address (" + localAddress + ") anyway as requested."); } // 是否是激活状态 boolean wasActive = isActive(); try { doBind(localAddress); } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); return; } if (!wasActive && isActive()) { // 如果是在绑定阶段成为active状态,则将调用fireChannelActive方法放进NioEventLoop执行队列中 invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelActive(); } }); } promise.setSuccess();}private void invokeLater(Runnable task) { eventLoop().execute(task);}
NioSocketChannel 的 diBind 实现:
protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress);}
NioServerSocketChannel 的 doBind 实现:
protected void doBind(SocketAddress localAddress) throws Exception { javaChannel().socket().bind(localAddress, config.getBacklog());}
3. disconnect方法
该方法用于客户端或服务端主动关闭连接。
public final void disconnect(final ChannelPromise promise) { boolean wasActive = isActive(); try { doDisconnect(); } catch (Throwable t) { promise.setFailure(t); closeIfClosed(); return; } if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelInactive(); } }); } promise.setSuccess(); closeIfClosed(); // doDisconnect() might have closed the channel}
NioServerSocketChannel.doDisconnect():服务端不支持主动关闭连接
protected void doDisconnect() throws Exception { throw new UnsupportedOperationException();}
NioSocketChannel.doDisconnect():调用SocketChannel关闭连接
protected void doDisconnect() throws Exception { doClose();}protected void doClose() throws Exception { javaChannel().close();}
4. close方法
public final void close(final ChannelPromise promise) { // 1. 是否处于刷新状态,如果处于刷新状态说明还有消息没发出去,需要等到所有消息发完后再关闭 // 放入队列中处理 if (inFlush0) { invokeLater(new Runnable() { @Override public void run() { close(promise); } }); return; } // 2. 判断关闭操作是否完成,如果已完成,则不需要重复关闭链路,设置promise成功即可 if (closeFuture.isDone()) { // Closed already. promise.setSuccess(); return; } // 3. 执行关闭操作,将消息发送缓冲数组置空,通知JVM回收 boolean wasActive = isActive(); ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer. try { // 4. 关闭链路,本质是调用javaChannel的close方法 doClose(); closeFuture.setClosed(); promise.setSuccess(); } catch (Throwable t) { closeFuture.setClosed(); promise.setFailure(t); } // 5. 调用ChannelOutboundBuffer.close()释放缓冲区消息,将链路关闭通知事件放进NioEventLoop执行队列中 try { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION); } finally { if (wasActive && !isActive()) { invokeLater(new Runnable() { @Override public void run() { pipeline.fireChannelInactive(); } }); } // 6. 将Channel从多路复用器上取消注册 deregister(); }}protected void doDeregister() throws Exception { eventLoop().cancel(selectionKey());}// 实际上就是将SelectionKey对应的Channel从多路复用器上去取消注册void cancel(SelectionKey key) { key.cancel(); cancelledKeys ++; if (cancelledKeys >= CLEANUP_INTERVAL) { cancelledKeys = 0; needsToSelectAgain = true; }}
5. write方法
write方法实际上是将消息添加到环形发送数组上,并不真正的写Channel(真正的写Channel是flush方法)。
public void write(Object msg, ChannelPromise promise) { if (!isActive()) { // 未激活,TCP链路还没建立成功,根据Channel打开情况设置不同的异常 if (isOpen()) { promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION); } else { promise.tryFailure(CLOSED_CHANNEL_EXCEPTION); } // 无法发送,释放msg对象 ReferenceCountUtil.release(msg); } else { // 链路状态正常,将数据和promise放进发送缓冲区 outboundBuffer.addMessage(msg, promise); }}
6. flush方法
前面提到,write方法负责将消息放进发送缓冲区,并没有真正的发送,而flush方法就负责将发送缓冲区中待发送的消息全部写进Channel中并发送。
public void flush() { ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null) { return; } // 先将unflush指针修改为tail,标识本次发送的范围 outboundBuffer.addFlush(); flush0();}protected void flush0() { if (inFlush0) { // Avoid re-entrance return; } final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer; if (outboundBuffer == null || outboundBuffer.isEmpty()) { return; } inFlush0 = true; // Mark all pending write requests as failure if the channel is inactive. if (!isActive()) { try { if (isOpen()) { outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION); } else { outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION); } } finally { inFlush0 = false; } return; } try { // 调用NioSocketChannel的write方法 doWrite(outboundBuffer); } catch (Throwable t) { outboundBuffer.failFlushed(t); } finally { inFlush0 = false; }}
三、AbstractNioUnsafe源码分析
1. connect方法
前面说到,NioSocketChannel的连接操作有三种可能:
1. 连接成功
2. 连接失败,关闭客户端连接
3. 连接暂未响应,监听OP_CONNECT
在connect方法中,如果连接成功,进行激活操作;如果连接暂未响应,则对其做一个监听,监听的内容是:如果连接失败,则关闭链路。
public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) { // 设置不可取消 && Channel是打开状态 if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } try { if (connectPromise != null) { // 已经有一个连接正在处理,直接抛异常 throw new ConnectionPendingException(); } boolean wasActive = isActive(); // doConnect方法具体看NioSocketChannel.doConnect()实现 if (doConnect(remoteAddress, localAddress)) { // 连接成功,进行连接后操作 fulfillConnectPromise(promise, wasActive); } else { // 连接失败,TCP无应答,结果暂未知晓 connectPromise = promise; requestedRemoteAddress = remoteAddress; // Schedule connect timeout. int connectTimeoutMillis = config().getConnectTimeoutMillis(); if (connectTimeoutMillis > 0) { connectTimeoutFuture = eventLoop().schedule(new Runnable() { @Override public void run() { ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise; ConnectTimeoutException cause = new ConnectTimeoutException("connection timed out: " + remoteAddress); if (connectPromise != null && connectPromise.tryFailure(cause)) { close(voidPromise()); } } }, connectTimeoutMillis, TimeUnit.MILLISECONDS); } promise.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isCancelled()) { if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; close(voidPromise()); } } }); } } catch (Throwable t) { promise.tryFailure(annotateConnectException(t, remoteAddress)); closeIfClosed(); }}private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) { if (promise == null) { // Closed via cancellation and the promise has been notified already. return; } // 判断当前激活状态 boolean active = isActive(); // 如果用户取消了连接,则返回false,需调用close方法关闭链路 boolean promiseSet = promise.trySuccess(); // 如果doConnect之前未激活,doConnect之后激活了,需要调用fireChannelActive(即使被取消了也应该调) if (!wasActive && active) { pipeline().fireChannelActive(); } // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive(). if (!promiseSet) { close(voidPromise()); }}
2. finishConnect方法
该方法用于判断连接操作是否结束。
首先判断当前线程是否就是EventLoop执行线程,不允许其他线程操作;
缓存当前active状态,用以下面是否要执行fireChannelActive方法;
调用javaChannel的finishConnect方法,该方法返回三种情况:
1)连接成功,返回true
2)连接失败,返回false
3)发生链路被关闭、链路中断异常,连接失败
根据javaChannel的返回值,如果返回false,直接抛出error,进入到catch模块
然后就根据连接状态做不同的后续处理
public final void finishConnect() { // Note this method is invoked by the event loop only if the connection attempt was // neither cancelled nor timed out. assert eventLoop().inEventLoop(); try { boolean wasActive = isActive(); // 通过javaChannel的finishConnect方法判断连接结果(如果连接失败则抛出Error,会走到catch块里) doFinishConnect(); // 连接成功方法:fulfillConnectPromise(ChannelPromise promise, boolean wasActive) fulfillConnectPromise(connectPromise, wasActive); } catch (Throwable t) { // 关闭链路方法:fulfillConnectPromise(ChannelPromise promise, Throwable cause) fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress)); } finally { // 如果连接超时时仍然没有收到服务端应答,则由定时任务关闭客户端连接,将SocketChannel从多路复用器上删除 if (connectTimeoutFuture != null) { connectTimeoutFuture.cancel(false); } connectPromise = null; }}
四、NioByteUnsafe源码分析
这里我们主要分析下它的 read方法。
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; ...
public RecvByteBufAllocator.Handle recvBufAllocHandle() {
if (recvHandle == null) { recvHandle = config().getRecvByteBufAllocator().newHandle(); } return recvHandle;}首先,获取NioSocketChannel的SocketChannelConfig,用于设置客户端连接的TCP参数。
继续看allocHandle的初始化,则从SocketChannelConfig的RecvByteBufAllocator中创建一个新的handle。
RecvByteBufAllocator有两个实现,分别是FixedRecvByteBufAllocator 和 AdaptiveRecvByteBufAllocator。FixedRecvByteBufAllocator 比较简单,我们主要分析下AdaptiveRecvByteBufAllocator。
根据名称就可以判断,AdaptiveRecvByteBufAllocator是根据本地读取的字节数动态调整下次接收缓冲区容量。
我们先看下AdaptiveRecvByteBufAllocator的 成员变量:
static final int DEFAULT_MINIMUM = 64;//最小缓冲区长度static final int DEFAULT_INITIAL = 1024;//初始容量static final int DEFAULT_MAXIMUM = 65536;//最大容量private static final int INDEX_INCREMENT = 4;//动态调整扩张步进索引private static final int INDEX_DECREMENT = 1;//动态调整收缩步进索引private static final int[] SIZE_TABLE;//长度向量表,数组的每个值对应一个Buffer容量// 初始化长度向量表// 当容量小于512时,由于缓冲区已经比较小,需要降低步进值,容量每次下调幅度降低// 当容量大于512时,说明需要解码的消息码流比较大,需要采用调大步进幅度的方式降低动态扩张频率static { ListsizeTable = new ArrayList (); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}
然后再来看一下AdaptiveRecvByteBufAllocator.getSizeTableIndex(..)方法:根据容量size查找容器向量表对应的索引
private static int getSizeTableIndex(final int size) { for (int low = 0, high = SIZE_TABLE.length - 1;;) { if (high < low) { return low; } if (high == low) { return high; } int mid = low + high >>> 1; int a = SIZE_TABLE[mid]; int b = SIZE_TABLE[mid + 1]; if (size > b) { low = mid + 1; } else if (size < a) { high = mid - 1; } else if (size == a) { return mid; } else { return mid + 1; } }}
然后我们再来看一下AdaptiveRecvByteBufAllocator的静态内部类HandlerImpl,该类有五个成员变量:
private final int minIndex; //最小索引private final int maxIndex; //最大索引private int index; //当前索引private int nextReceiveBufferSize; //下一次预分配的Buffer大小private boolean decreaseNow; //是否立即执行容量收缩操作
该类有一个比较重要的方法,record(int actualReadBytes),当NioSocketChannel执行完读操作后,会计算获得本轮轮询读取的总字节数,也就是record方法的入参actualReadBytes,该方法根据读取的字节数对ByteBuf进行动态伸缩和扩张。record操作步骤如下
1)将当前容量缩减后的值与实际读取的值做比较,如果实际读取的值小于收缩后的容量,则将缓冲区容量降低
2)如果实际读取的值大于当前Buffer容量,说明实际分配容量不足,需要动态扩张
private void record(int actualReadBytes) { if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) { if (decreaseNow) { index = max(index - INDEX_DECREMENT, minIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; } else { decreaseNow = true; } } else if (actualReadBytes >= nextReceiveBufferSize) { index = min(index + INDEX_INCREMENT, maxIndex); nextReceiveBufferSize = SIZE_TABLE[index]; decreaseNow = false; }}
AdaptiveRecvByteBufAllocator优点总结:
1. 性能更高。容量过大会导致内存占用开销增加,后续的Buffer处理性能会下降;容量过小时需要频繁的内存扩张来接收大的请求消息,同样会导致性能下降
2. 更节约内存。根据不同的场景动态的扩张或缩减内存,达到内存使用最优化。
然后我们接着来分析 read方法,这里循环读取缓冲区数据,并根据上次读取字节数动态调整ByteBuffer大小。每次读取都要触发一次read事件 fireChannelRead,注意,这里并不是说一次read就读完了全部消息,可能存在粘包拆包情况。
当上次读取了0个字节,说明已经读完了,跳出循环,触发读操作完成事件 fireChannelReadComplete。
public final void read() { final ChannelConfig config = config(); if (shouldBreakReadReady(config)) { clearReadPending(); return; } final ChannelPipeline pipeline = pipeline(); final ByteBufAllocator allocator = config.getAllocator(); final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle(); allocHandle.reset(config); ByteBuf byteBuf = null; boolean close = false; try { do { // 通过接收缓冲区分配器计算下次预分配的缓冲区容量并创建ByteBuffer byteBuf = allocHandle.allocate(allocator); // 这里分两步:1. doReadBytes(byteBuf):调用NioSocketChannel.doReadBytes(..),返回本次读取的字节数(返回0-无消息 返回小于0-发生了IO异常) // 2. 设置lastBytesRead,用以下面的处理 allocHandle.lastBytesRead(doReadBytes(byteBuf)); if (allocHandle.lastBytesRead() <= 0) { // 走到这里说明上一步没有读取到数据,释放ByteBuffer byteBuf.release(); byteBuf = null; close = allocHandle.lastBytesRead() < 0; if (close) { // 发生了IO异常,需关闭连接 readPending = false; } break; } allocHandle.incMessagesRead(1); readPending = false; // 一次读操作,触发一次read事件 pipeline.fireChannelRead(byteBuf); byteBuf = null; } while (allocHandle.continueReading()); allocHandle.readComplete(); // 触发读操作结束事件 pipeline.fireChannelReadComplete(); if (close) { closeOnRead(pipeline); } } catch (Throwable t) { handleReadException(pipeline, byteBuf, t, close, allocHandle); } finally { // Check if there is a readPending which was not processed yet. // This could be for two reasons: // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method // // See https://github.com/netty/netty/issues/2254 if (!readPending && !config.isAutoRead()) { removeReadOp(); } } }}protected int doReadBytes(ByteBuf byteBuf) throws Exception { final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle(); allocHandle.attemptedBytesRead(byteBuf.writableBytes()); return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}public int writeBytes(ScatteringByteChannel in, int length) throws IOException { ensureWritable(length); int writtenBytes = setBytes(writerIndex, in, length); if (writtenBytes > 0) { writerIndex += writtenBytes; } return writtenBytes;}