博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
4. Netty源码分析之Unsafe
阅读量:5091 次
发布时间:2019-06-13

本文共 19993 字,大约阅读时间需要 66 分钟。

Unsafe类实际上是Channel接口的辅助类,实际的IO操作都是由Unsafe接口完成的。

一、Unsafe继承关系图

二、AbstractUnsafe源码分析

1. register方法

  register方法主要用于将当前Unsafe对应的Channel注册到EventLoop的多路复用器上,然后调用DefaultChannelPipeline的fireChannelRegisted方法,如果Channel被激活,则调用fireChannelActive方法。

public final void register(final ChannelPromise promise) {    // 当前线程是否为Channel对应的NioEventLoop线程    if (eventLoop.inEventLoop()) {        // 如果是,则不存在多线程并发操作,直接注册        register0(promise);    } else {        // 如果不是,说明是其他线程或用户线程发起的注册,存在并发操作,将其放进NioEventLoop任务队列中执行        try {            eventLoop.execute(new Runnable() {                @Override                public void run() {                    register0(promise);                }            });        } catch (Throwable t) {            logger.warn(                    "Force-closing a channel whose registration task was not accepted by an event loop: {}",                    AbstractChannel.this, t);            closeForcibly();            closeFuture.setClosed();            promise.setFailure(t);        }    }}private void register0(ChannelPromise promise) {    try {        // 判断Channel是否打开了        if (!ensureOpen(promise)) {            return;        }        // 调用AbstractNioChannel的doRegister方法。请见 Netty源码分析-Channel        doRegister();        registered = true;        promise.setSuccess();        // 注册成功        pipeline.fireChannelRegistered();        if (isActive()) {            // Channel被激活            pipeline.fireChannelActive();        }    } catch (Throwable t) {        // Close the channel directly to avoid FD leak.        closeForcibly();        closeFuture.setClosed();        if (!promise.tryFailure(t)) {            logger.warn(                    "Tried to fail the registration promise, but it is complete already. " +                            "Swallowing the cause of the registration failure:", t);        }    }}// AbstractNioChannel.doRegister()protected void doRegister() throws Exception {    boolean selected = false;    for (;;) {        try {            selectionKey = javaChannel().register(eventLoop().selector, 0, this);            return;        } catch (CancelledKeyException e) {            if (!selected) {                // Force the Selector to select now as the "canceled" SelectionKey may still be                // cached and not removed because no Select.select(..) operation was called yet.                eventLoop().selectNow();                selected = true;            } else {                // We forced a select operation on the selector before but the SelectionKey is still cached                // for whatever reason. JDK bug ?                throw e;            }        }    }}

2. bind方法

  bind方法主要用于绑定指定端口。对于服务端,用于绑定监听端口,并设置backlog参数;对于客户端,用于指定客户端Channel的本地绑定Socket地址。

public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {    if (!ensureOpen(promise)) {        return;    }    // See: https://github.com/netty/netty/issues/576    if (!PlatformDependent.isWindows() && !PlatformDependent.isRoot() &&        Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&        localAddress instanceof InetSocketAddress &&        !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress()) {        // Warn a user about the fact that a non-root user can't receive a        // broadcast packet on *nix if the socket is bound on non-wildcard address.        logger.warn(                "A non-root user can't receive a broadcast packet if the socket " +                "is not bound to a wildcard address; binding to a non-wildcard " +                "address (" + localAddress + ") anyway as requested.");    }        // 是否是激活状态    boolean wasActive = isActive();    try {        doBind(localAddress);    } catch (Throwable t) {        promise.setFailure(t);        closeIfClosed();        return;    }    if (!wasActive && isActive()) {        // 如果是在绑定阶段成为active状态,则将调用fireChannelActive方法放进NioEventLoop执行队列中        invokeLater(new Runnable() {            @Override            public void run() {                pipeline.fireChannelActive();            }        });    }    promise.setSuccess();}private void invokeLater(Runnable task) {    eventLoop().execute(task);}

NioSocketChannel 的 diBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception {    javaChannel().socket().bind(localAddress);}

NioServerSocketChannel 的 doBind 实现:

protected void doBind(SocketAddress localAddress) throws Exception {    javaChannel().socket().bind(localAddress, config.getBacklog());}

3. disconnect方法

  该方法用于客户端或服务端主动关闭连接。

public final void disconnect(final ChannelPromise promise) {    boolean wasActive = isActive();    try {        doDisconnect();    } catch (Throwable t) {        promise.setFailure(t);        closeIfClosed();        return;    }    if (wasActive && !isActive()) {        invokeLater(new Runnable() {            @Override            public void run() {                pipeline.fireChannelInactive();            }        });    }    promise.setSuccess();    closeIfClosed(); // doDisconnect() might have closed the channel}

NioServerSocketChannel.doDisconnect():服务端不支持主动关闭连接

protected void doDisconnect() throws Exception {    throw new UnsupportedOperationException();}

NioSocketChannel.doDisconnect():调用SocketChannel关闭连接

protected void doDisconnect() throws Exception {    doClose();}protected void doClose() throws Exception {    javaChannel().close();}

4. close方法

public final void close(final ChannelPromise promise) {    // 1. 是否处于刷新状态,如果处于刷新状态说明还有消息没发出去,需要等到所有消息发完后再关闭    // 放入队列中处理    if (inFlush0) {        invokeLater(new Runnable() {            @Override            public void run() {                close(promise);            }        });        return;    }    // 2. 判断关闭操作是否完成,如果已完成,则不需要重复关闭链路,设置promise成功即可    if (closeFuture.isDone()) {        // Closed already.        promise.setSuccess();        return;    }    // 3. 执行关闭操作,将消息发送缓冲数组置空,通知JVM回收    boolean wasActive = isActive();    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    this.outboundBuffer = null; // Disallow adding any messages and flushes to outboundBuffer.    try {        // 4. 关闭链路,本质是调用javaChannel的close方法        doClose();        closeFuture.setClosed();        promise.setSuccess();    } catch (Throwable t) {        closeFuture.setClosed();        promise.setFailure(t);    }    // 5. 调用ChannelOutboundBuffer.close()释放缓冲区消息,将链路关闭通知事件放进NioEventLoop执行队列中    try {        outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);        outboundBuffer.close(CLOSED_CHANNEL_EXCEPTION);    } finally {        if (wasActive && !isActive()) {            invokeLater(new Runnable() {                @Override                public void run() {                    pipeline.fireChannelInactive();                }            });        }        // 6. 将Channel从多路复用器上取消注册        deregister();    }}protected void doDeregister() throws Exception {    eventLoop().cancel(selectionKey());}// 实际上就是将SelectionKey对应的Channel从多路复用器上去取消注册void cancel(SelectionKey key) {    key.cancel();    cancelledKeys ++;    if (cancelledKeys >= CLEANUP_INTERVAL) {        cancelledKeys = 0;        needsToSelectAgain = true;    }}

5. write方法

  write方法实际上是将消息添加到环形发送数组上,并不真正的写Channel(真正的写Channel是flush方法)。

public void write(Object msg, ChannelPromise promise) {    if (!isActive()) {        // 未激活,TCP链路还没建立成功,根据Channel打开情况设置不同的异常        if (isOpen()) {            promise.tryFailure(NOT_YET_CONNECTED_EXCEPTION);        } else {            promise.tryFailure(CLOSED_CHANNEL_EXCEPTION);        }        // 无法发送,释放msg对象        ReferenceCountUtil.release(msg);    } else {        // 链路状态正常,将数据和promise放进发送缓冲区        outboundBuffer.addMessage(msg, promise);    }}

6. flush方法

  前面提到,write方法负责将消息放进发送缓冲区,并没有真正的发送,而flush方法就负责将发送缓冲区中待发送的消息全部写进Channel中并发送。

public void flush() {    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null) {        return;    }    // 先将unflush指针修改为tail,标识本次发送的范围    outboundBuffer.addFlush();    flush0();}protected void flush0() {    if (inFlush0) {        // Avoid re-entrance        return;    }    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;    if (outboundBuffer == null || outboundBuffer.isEmpty()) {        return;    }    inFlush0 = true;    // Mark all pending write requests as failure if the channel is inactive.    if (!isActive()) {        try {            if (isOpen()) {                outboundBuffer.failFlushed(NOT_YET_CONNECTED_EXCEPTION);            } else {                outboundBuffer.failFlushed(CLOSED_CHANNEL_EXCEPTION);            }        } finally {            inFlush0 = false;        }        return;    }    try {        // 调用NioSocketChannel的write方法        doWrite(outboundBuffer);    } catch (Throwable t) {        outboundBuffer.failFlushed(t);    } finally {        inFlush0 = false;    }}

三、AbstractNioUnsafe源码分析

1. connect方法

  前面说到,NioSocketChannel的连接操作有三种可能:

    1. 连接成功

    2. 连接失败,关闭客户端连接

    3. 连接暂未响应,监听OP_CONNECT

  在connect方法中,如果连接成功,进行激活操作;如果连接暂未响应,则对其做一个监听,监听的内容是:如果连接失败,则关闭链路。

public final void connect(final SocketAddress remoteAddress, final SocketAddress localAddress, final ChannelPromise promise) {    // 设置不可取消 && Channel是打开状态    if (!promise.setUncancellable() || !ensureOpen(promise)) {        return;    }    try {        if (connectPromise != null) {            // 已经有一个连接正在处理,直接抛异常            throw new ConnectionPendingException();        }        boolean wasActive = isActive();                // doConnect方法具体看NioSocketChannel.doConnect()实现        if (doConnect(remoteAddress, localAddress)) {            // 连接成功,进行连接后操作            fulfillConnectPromise(promise, wasActive);        } else {            // 连接失败,TCP无应答,结果暂未知晓            connectPromise = promise;            requestedRemoteAddress = remoteAddress;            // Schedule connect timeout.            int connectTimeoutMillis = config().getConnectTimeoutMillis();            if (connectTimeoutMillis > 0) {                connectTimeoutFuture = eventLoop().schedule(new Runnable() {                    @Override                    public void run() {                        ChannelPromise connectPromise = AbstractNioChannel.this.connectPromise;                        ConnectTimeoutException cause =                                new ConnectTimeoutException("connection timed out: " + remoteAddress);                        if (connectPromise != null && connectPromise.tryFailure(cause)) {                            close(voidPromise());                        }                    }                }, connectTimeoutMillis, TimeUnit.MILLISECONDS);            }            promise.addListener(new ChannelFutureListener() {                @Override                public void operationComplete(ChannelFuture future) throws Exception {                    if (future.isCancelled()) {                        if (connectTimeoutFuture != null) {                            connectTimeoutFuture.cancel(false);                        }                        connectPromise = null;                        close(voidPromise());                    }                }            });        }    } catch (Throwable t) {        promise.tryFailure(annotateConnectException(t, remoteAddress));        closeIfClosed();    }}private void fulfillConnectPromise(ChannelPromise promise, boolean wasActive) {    if (promise == null) {        // Closed via cancellation and the promise has been notified already.        return;    }    // 判断当前激活状态    boolean active = isActive();    // 如果用户取消了连接,则返回false,需调用close方法关闭链路    boolean promiseSet = promise.trySuccess();    // 如果doConnect之前未激活,doConnect之后激活了,需要调用fireChannelActive(即使被取消了也应该调)    if (!wasActive && active) {        pipeline().fireChannelActive();    }    // If a user cancelled the connection attempt, close the channel, which is followed by channelInactive().    if (!promiseSet) {        close(voidPromise());    }}

2. finishConnect方法

  该方法用于判断连接操作是否结束。

  首先判断当前线程是否就是EventLoop执行线程,不允许其他线程操作;

  缓存当前active状态,用以下面是否要执行fireChannelActive方法;

  调用javaChannel的finishConnect方法,该方法返回三种情况:

    1)连接成功,返回true

    2)连接失败,返回false

    3)发生链路被关闭、链路中断异常,连接失败

  根据javaChannel的返回值,如果返回false,直接抛出error,进入到catch模块

  然后就根据连接状态做不同的后续处理

public final void finishConnect() {    // Note this method is invoked by the event loop only if the connection attempt was    // neither cancelled nor timed out.    assert eventLoop().inEventLoop();    try {        boolean wasActive = isActive();        // 通过javaChannel的finishConnect方法判断连接结果(如果连接失败则抛出Error,会走到catch块里)        doFinishConnect();        // 连接成功方法:fulfillConnectPromise(ChannelPromise promise, boolean wasActive)        fulfillConnectPromise(connectPromise, wasActive);    } catch (Throwable t) {        // 关闭链路方法:fulfillConnectPromise(ChannelPromise promise, Throwable cause)        fulfillConnectPromise(connectPromise, annotateConnectException(t, requestedRemoteAddress));    } finally {        // 如果连接超时时仍然没有收到服务端应答,则由定时任务关闭客户端连接,将SocketChannel从多路复用器上删除        if (connectTimeoutFuture != null) {            connectTimeoutFuture.cancel(false);        }        connectPromise = null;    }}

四、NioByteUnsafe源码分析

  这里我们主要分析下它的 read方法。

public final void read() {            final ChannelConfig config = config();            if (shouldBreakReadReady(config)) {                clearReadPending();                return;            }            final ChannelPipeline pipeline = pipeline();            final ByteBufAllocator allocator = config.getAllocator();            final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();            allocHandle.reset(config);            ByteBuf byteBuf = null;            boolean close = false; ...

public RecvByteBufAllocator.Handle recvBufAllocHandle() {

  if (recvHandle == null) {
    recvHandle = config().getRecvByteBufAllocator().newHandle();
  }
  return recvHandle;
}

首先,获取NioSocketChannel的SocketChannelConfig,用于设置客户端连接的TCP参数。

继续看allocHandle的初始化,则从SocketChannelConfig的RecvByteBufAllocator中创建一个新的handle。

RecvByteBufAllocator有两个实现,分别是FixedRecvByteBufAllocator 和 AdaptiveRecvByteBufAllocator。FixedRecvByteBufAllocator 比较简单,我们主要分析下AdaptiveRecvByteBufAllocator。

根据名称就可以判断,AdaptiveRecvByteBufAllocator是根据本地读取的字节数动态调整下次接收缓冲区容量

我们先看下AdaptiveRecvByteBufAllocator的 成员变量:

static final int DEFAULT_MINIMUM = 64;//最小缓冲区长度static final int DEFAULT_INITIAL = 1024;//初始容量static final int DEFAULT_MAXIMUM = 65536;//最大容量private static final int INDEX_INCREMENT = 4;//动态调整扩张步进索引private static final int INDEX_DECREMENT = 1;//动态调整收缩步进索引private static final int[] SIZE_TABLE;//长度向量表,数组的每个值对应一个Buffer容量// 初始化长度向量表// 当容量小于512时,由于缓冲区已经比较小,需要降低步进值,容量每次下调幅度降低// 当容量大于512时,说明需要解码的消息码流比较大,需要采用调大步进幅度的方式降低动态扩张频率static {    List
sizeTable = new ArrayList
(); for (int i = 16; i < 512; i += 16) { sizeTable.add(i); } for (int i = 512; i > 0; i <<= 1) { sizeTable.add(i); } SIZE_TABLE = new int[sizeTable.size()]; for (int i = 0; i < SIZE_TABLE.length; i ++) { SIZE_TABLE[i] = sizeTable.get(i); }}

然后再来看一下AdaptiveRecvByteBufAllocator.getSizeTableIndex(..)方法:根据容量size查找容器向量表对应的索引

private static int getSizeTableIndex(final int size) {    for (int low = 0, high = SIZE_TABLE.length - 1;;) {        if (high < low) {            return low;        }        if (high == low) {            return high;        }        int mid = low + high >>> 1;        int a = SIZE_TABLE[mid];        int b = SIZE_TABLE[mid + 1];        if (size > b) {            low = mid + 1;        } else if (size < a) {            high = mid - 1;        } else if (size == a) {            return mid;        } else {            return mid + 1;        }    }}

然后我们再来看一下AdaptiveRecvByteBufAllocator的静态内部类HandlerImpl,该类有五个成员变量:

private final int minIndex;            //最小索引private final int maxIndex;            //最大索引private int index;                    //当前索引private int nextReceiveBufferSize;    //下一次预分配的Buffer大小private boolean decreaseNow;        //是否立即执行容量收缩操作

该类有一个比较重要的方法,record(int actualReadBytes),当NioSocketChannel执行完读操作后,会计算获得本轮轮询读取的总字节数,也就是record方法的入参actualReadBytes,该方法根据读取的字节数对ByteBuf进行动态伸缩和扩张。record操作步骤如下

  1)将当前容量缩减后的值与实际读取的值做比较,如果实际读取的值小于收缩后的容量,则将缓冲区容量降低

  2)如果实际读取的值大于当前Buffer容量,说明实际分配容量不足,需要动态扩张

private void record(int actualReadBytes) {    if (actualReadBytes <= SIZE_TABLE[max(0, index - INDEX_DECREMENT - 1)]) {        if (decreaseNow) {            index = max(index - INDEX_DECREMENT, minIndex);            nextReceiveBufferSize = SIZE_TABLE[index];            decreaseNow = false;        } else {            decreaseNow = true;        }    } else if (actualReadBytes >= nextReceiveBufferSize) {        index = min(index + INDEX_INCREMENT, maxIndex);        nextReceiveBufferSize = SIZE_TABLE[index];        decreaseNow = false;    }}

AdaptiveRecvByteBufAllocator优点总结:

1. 性能更高。容量过大会导致内存占用开销增加,后续的Buffer处理性能会下降;容量过小时需要频繁的内存扩张来接收大的请求消息,同样会导致性能下降

2. 更节约内存。根据不同的场景动态的扩张或缩减内存,达到内存使用最优化。

 

然后我们接着来分析 read方法,这里循环读取缓冲区数据,并根据上次读取字节数动态调整ByteBuffer大小。每次读取都要触发一次read事件 fireChannelRead,注意,这里并不是说一次read就读完了全部消息,可能存在粘包拆包情况。

当上次读取了0个字节,说明已经读完了,跳出循环,触发读操作完成事件 fireChannelReadComplete。

public final void read() {        final ChannelConfig config = config();        if (shouldBreakReadReady(config)) {            clearReadPending();            return;        }        final ChannelPipeline pipeline = pipeline();        final ByteBufAllocator allocator = config.getAllocator();        final RecvByteBufAllocator.Handle allocHandle = recvBufAllocHandle();        allocHandle.reset(config);        ByteBuf byteBuf = null;        boolean close = false;        try {            do {                // 通过接收缓冲区分配器计算下次预分配的缓冲区容量并创建ByteBuffer                byteBuf = allocHandle.allocate(allocator);                // 这里分两步:1. doReadBytes(byteBuf):调用NioSocketChannel.doReadBytes(..),返回本次读取的字节数(返回0-无消息   返回小于0-发生了IO异常)                // 2. 设置lastBytesRead,用以下面的处理                allocHandle.lastBytesRead(doReadBytes(byteBuf));                if (allocHandle.lastBytesRead() <= 0) {                    // 走到这里说明上一步没有读取到数据,释放ByteBuffer                    byteBuf.release();                    byteBuf = null;                    close = allocHandle.lastBytesRead() < 0;                    if (close) {                        // 发生了IO异常,需关闭连接                        readPending = false;                    }                    break;                }                allocHandle.incMessagesRead(1);                readPending = false;                // 一次读操作,触发一次read事件                pipeline.fireChannelRead(byteBuf);                byteBuf = null;            } while (allocHandle.continueReading());                        allocHandle.readComplete();            // 触发读操作结束事件            pipeline.fireChannelReadComplete();            if (close) {                closeOnRead(pipeline);            }        } catch (Throwable t) {            handleReadException(pipeline, byteBuf, t, close, allocHandle);        } finally {            // Check if there is a readPending which was not processed yet.            // This could be for two reasons:            // * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method            // * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method            //            // See https://github.com/netty/netty/issues/2254            if (!readPending && !config.isAutoRead()) {                removeReadOp();            }        }    }}protected int doReadBytes(ByteBuf byteBuf) throws Exception {    final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();    allocHandle.attemptedBytesRead(byteBuf.writableBytes());    return byteBuf.writeBytes(javaChannel(), allocHandle.attemptedBytesRead());}public int writeBytes(ScatteringByteChannel in, int length) throws IOException {    ensureWritable(length);    int writtenBytes = setBytes(writerIndex, in, length);    if (writtenBytes > 0) {        writerIndex += writtenBytes;    }    return writtenBytes;}

 

转载于:https://www.cnblogs.com/lovezmc/p/11556412.html

你可能感兴趣的文章
solr后台操作Documents之增删改查
查看>>
http://yusi123.com/
查看>>
文件文本的操作
查看>>
Ubuntu linux下gcc版本切换
查看>>
记一次Web服务的性能调优
查看>>
jQuery.form.js使用
查看>>
(转)linux sort,uniq,cut,wc命令详解
查看>>
关于ExecuteNonQuery执行的返回值(SQL语句、存储过程)
查看>>
UVa540 Team Queue(队列queue)
查看>>
mysql数据增删改查
查看>>
akka之种子节点
查看>>
不知道做什么时
查看>>
matlab 给某一列乘上一个系数
查看>>
密码学笔记——培根密码
查看>>
Screening technology proved cost effective deal
查看>>
MAC 上升级python为最新版本
查看>>
创业老板不能犯的十种错误
查看>>
Animations介绍及实例
查看>>
判断请求是否为ajax请求
查看>>
【POJ2699】The Maximum Number of Strong Kings(网络流)
查看>>