JDK源码阅读——InterruptibleChannel 与可中断 IO

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> JDK源码阅读——InterruptibleChannel 与可中断 IO

来源:木杉的博客 ,
imushan.com/2018/08/01/java/language/JDK源码阅读-InterruptibleChannel与可中断IO/

Java传统IO是不支持中断的,所以如果代码在read/write等操作阻塞的话,是无法被中断的。这就无法和Thead的interrupt模型配合使用了。JavaNIO众多的升级点中就包含了IO操作对中断的支持。InterruptiableChannel表示支持中断的Channel。我们常用的FileChannel,SocketChannel,DatagramChannel都实现了这个接口。

InterruptibleChannel接口


public interface InterruptibleChannel extends Channel

{

 

    /**

     * 关闭当前Channel

     *     

     * 任何当前阻塞在当前channel执行的IO操作上的线程,都会收到一个AsynchronousCloseException异常

     */

    public void close() throws IOException;

}

InterruptibleChannel接口没有定义任何方法,其中的close方法是父接口就有的,这里只是添加了额外的注释。

AbstractInterruptibleChannel实现了InterruptibleChannel接口,并提供了实现可中断IO机制的重要的方法,比如begin(),end()。

在解读这些方法的代码前,先了解一下NIO中,支持中断的Channel代码是如何编写的。

第一个要求是要正确使用begin()和end()方法:


boolean completed = false;

try {

    begin();

    completed = ...;    // 执行阻塞IO操作

    return ...;         // 返回结果

} finally {

    end(completed);

}

NIO规定了,在阻塞IO的语句前后,需要调用begin()和end()方法,为了保证end()方法一定被调用,要求放在finally语句块中。

第二个要求是Channel需要实现java.nio.channels.spi.AbstractInterruptibleChannel#implCloseChannel这个方法。AbstractInterruptibleChannel在处理中断时,会调用这个方法,使用Channel的具体实现来关闭Channel。

接下来我们具体看一下begin()和end()方法是如何实现的。

begin方法


// 保存中断处理对象实例

private Interruptible interruptor;

// 保存被中断线程实例

private volatile Thread interrupted;

 

protected final void begin() {

    // 初始化中断处理对象,中断处理对象提供了中断处理回调

    // 中断处理回调登记被中断的线程,然后调用implCloseChannel方法,关闭Channel

    if (interruptor == null) {

        interruptor = new Interruptible() {

            public void interrupt(Thread target) {

                synchronized (closeLock) {

                    // 如果当前Channel已经关闭,则直接返回

                    if (!open)

                        return;

 

                    // 设置标志位,同时登记被中断的线程

                    open = false;

                    interrupted = target;

                    try {

                        // 调用具体的Channel实现关闭Channel

                        AbstractInterruptibleChannel.this.implCloseChannel();

                    } catch (IOException x) { }

                }

            }};

    }

    // 登记中断处理对象到当前线程

    blockedOn(interruptor);

 

    // 判断当前线程是否已经被中断,如果已经被中断,可能登记的中断处理对象没有被执行,这里手动执行一下

    Thread me = Thread.currentThread();

    if (me.isInterrupted())

        interruptor.interrupt(me);

}

从begin()方法中,我们可以看出NIO实现可中断IO操作的思路,是在Thread的中断逻辑中,挂载自定义的中断处理对象,这样Thread对象在被中断时,会执行中断处理对象中的回调,这个回调中,执行关闭Channel的操作。这样就实现了Channel对线程中断的响应了。

接下来重点就是研究“Thread添加中断处理逻辑”这个机制是如何实现的了,是通过blockedOn方法实现的:


static void blockedOn(Interruptible intr) {         // package-private

    sun.misc.SharedSecrets.getJavaLangAccess().blockedOn(Thread.currentThread(),intr);

}

blockedOn方法使用的是JavaLangAccess的blockedOn方法。

SharedSecrets是一个神奇而糟糕的类,为啥说是糟糕呢,因为这个方法的存在,就是为了访问JDK类库中一些因为类作用域限制而外部无法访问的类或者方法。JDK很多类与方法是私有或者包级别私有的,外部是无法访问的,但是JDK在本身实现的时候又存在互相依赖的情况,所以为了外部可以不依赖反射访问这些类或者方法,在sun包下,存在这么一个类,提供了各种超越限制的方法。

SharedSecrets.getJavaLangAccess()方法返回JavaLangAccess对象。JavaLangAccess对象就和名称所说的一样,提供了java.lang包下一些非公开的方法的访问。这个类在System初始化时被构造:


// java.lang.System#setJavaLangAccess

private static void setJavaLangAccess() {

    sun.misc.SharedSecrets.setJavaLangAccess(new sun.misc.JavaLangAccess(){

        public void blockedOn(Thread t, Interruptible b) {

            t.blockedOn(b);

        }

        //...

    });

}

可以看出,sun.misc.JavaLangAccess#blockedOn保证的就是java.lang.Thread#blockedOn这个包级别私有的方法:


/* The object in which this thread is blocked in an interruptible I/O

 * operation, if any.  The blocker's interrupt method should be invoked

 * after setting this thread's interrupt status.

 */

private volatile Interruptible blocker;

private final Object blockerLock = new Object();

 

/* Set the blocker field; invoked via sun.misc.SharedSecrets from java.nio code

 */

void blockedOn(Interruptible b) {

    // 串行化blocker相关操作

    synchronized (blockerLock) {

        blocker = b;

    }

}

而这个方法也非常简单,就是设置java.lang.Thread#blocker变量为之前提到的中断处理对象。而且从注释中可以看出,这个方法就是专门为NIO设计的,注释都非常直白的提到了,NIO的代码会通过sun.misc.SharedSecrets调用到这个方法。。

接下来就是重头戏了,看一下Thread在中断时,如何调用NIO注册的中断处理器:


public void interrupt() {

    if (this != Thread.currentThread())

        checkAccess();

 

    synchronized (blockerLock) {

        Interruptible b = blocker;

 

        // 如果NIO设置了中断处理器,则只需Thread本身的中断逻辑后,调用中断处理器的回调函数

        if (b != null) {

            interrupt0();           // 这一步会设置interrupt标志位

            b.interrupt(this);

            return;

        }

    }

 

    // 如果没有的话,就走普通流程

    interrupt0();

}
JDK源码阅读:InterruptibleChannel 与可中断 IO

end方法

begin()方法负责添加Channel的中断处理器到当前线程。end()是在IO操作执行完/中断完后的操作,负责判断中断是否发生,如果发生判断是当前线程发生还是别的线程中断把当前操作的Channel给关闭了,对于不同的情况,抛出不同的异常。


protected final void end(boolean completed) throws AsynchronousCloseException

{

    // 清空线程的中断处理器引用,避免线程一直存活导致中断处理器无法被回收

    blockedOn(null);

    Thread interrupted = this.interrupted;

 

    if (interrupted != null && interrupted == Thread.currentThread()) {

        interrupted = null;

        throw new ClosedByInterruptException();

    }

    // 如果这次没有读取到数据,并且Channel被另外一个线程关闭了,则排除Channel被异步关闭的异常

    // 但是如果这次读取到了数据,就不能抛出异常,因为这次读取的数据是有效的,需要返回给用户的(重要逻辑)

    if (!completed && !open)

        throw new AsynchronousCloseException();

}

通过代码可以看出,如果是当前线程被中断,则抛出ClosedByInterruptException异常,表示Channel因为线程中断而被关闭了,IO操作也随之中断了。

如果是当前线程发现Channel被关闭了,并且是读取还未执行完毕的情况,则抛出AsynchronousCloseException异常,表示Channel被异步关闭了。

end()逻辑的活动图如下:

JDK源码阅读:InterruptibleChannel 与可中断 IO

场景分析

并发的场景分析起来就是复杂,上面的代码不多,但是场景很多,我们以sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)为例分析一下可能的场景:

  • A线程read,B线程中断A线程:A线程抛出ClosedByInterruptException异常
  • A,B线程read,C线程中断A线程
  • A,B线程read,C线程中断A线程

  • A被中断时,B刚刚进入read方法:A线程抛出ClosedByInterruptException异常,B线程ensureOpen方法抛出ClosedChannelException异常
  • A被中断时,B阻塞在底层read方法中:A线程抛出ClosedByInterruptException异常,B线程底层方法抛出异常返回,end方法中抛出AsynchronousCloseException异常
  • A被中断时,B已经读取到数据:A线程抛出ClosedByInterruptException异常,B线程正常返回
  • sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer)代码如下:

    
    public int read(ByteBuffer dst) throws IOException {
    
        ensureOpen();  // 1
    
        if (!readable) // 2
    
            throw new NonReadableChannelException();
    
        synchronized (positionLock) {
    
            int n = 0;
    
            int ti = -1;
    
            try {            
    
                begin();
    
                ti = threads.add();
    
                if (!isOpen())
    
                    return 0; // 3
    
                do {
    
                    n = IOUtil.read(fd, dst, -1, nd); // 4
    
                } while ((n == IOStatus.INTERRUPTED) && isOpen());
    
                return IOStatus.normalize(n);
    
            } finally {
    
                threads.remove(ti);
    
                end(n  0);
    
                assert IOStatus.check(n);
    
            }
    
        }
    
    }
    

    总结

    在JavaIO时期,人们为了中断IO操作想了不少方法,核心操作就是关闭流,促使IO操作抛出异常,达到中断IO的效果。NIO中,将这个操作植入了java.lang.Thread#interrupt方法,免去用户自己编码特定代码的麻烦。使IO操作可以像其他可中断方法一样,在中断时抛出ClosedByInterruptException异常,业务程序捕获该异常即可对IO中断做出响应。

    参考资料

  • java – What does JavaLangAccess.blockedOn(Thread t, Interruptible b) do? – Stack Overflow  https://stackoverflow.com/questions/8544891/what-does-javalangaccess-blockedonthread-t-interruptible-b-do
  • Java NIO 那些躲在角落的细节

    https://www.oschina.net/question/138146_26027

  • 本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> JDK源码阅读——InterruptibleChannel 与可中断 IO


     上一篇
    JDK 源码阅读 ——ByteBuffer JDK 源码阅读 ——ByteBuffer
    来源:木杉的博客 ,imushan.com/2018/08/07/java/language/JDK源码阅读-ByteBuffer/ Buffer是Java NIO中
    2021-04-05
    下一篇 
    JDK动态代理-超详细源码分析 JDK动态代理-超详细源码分析
    来源:小北觅jianshu.com/p/269afd0a52e6 前言:  最近学习Hadoop系列的知识,免不了和RPC打交道,而RPC又使用动态代理来实现。沿着这条脉络,秉着钻研的精神,准备深入学习一下动态代理。
    2021-04-05