并发编程系列之Condition接口

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之Condition接口

并发编程系列之Condition接口

前言

前面我们学习线程的时候讲过等待通知模式,之前讲的是通过wait,notify/notifyAll配合synchronized关键字,实现等待通知,今天呢,我们介绍另外一种同样实现等待通知模式的对象叫做condition接口,配合lock使用,也能完成等待通知,但是跟之前说的又有一些区别,今天就让我们来认识一下吧,OK,开始我们今天的并发之旅吧,祝您旅途愉快。

什么是Condition接口?

Condition定义了等待/通知两种类型的方法,当前线程如果调用这些方法之前,必须先获取到condition对象关联的锁,Condition对象是由Lock对象创建出来的,也就是说Condition是绑定在一个Lock对象上的,依赖于Lock对象,使用时需要通过Lock对象new出来。相比于之前的wait/notify而言,condition充当wait/notify的角色,而lock对象充当synchronized锁角色。

我们看下Condition接口提供的一些方法如下:

并发编程系列之Condition接口

如何使用Condition实现等待通知

案例1:一个condition即1个等待队列

通过上面我们知道要使用condition提供的方法就必须将它绑定一个Lock对象,然后用法和wait/notify差不多,我们看下面demo


public class ConditionDemo {
    // 创建重入锁
    private Lock lock = new ReentrantLock();
    // Lock对象创建condition对象
    private Condition condition = lock.newCondition();

    public void method1(){
        try {
            lock.lock();
            System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,并睡眠3秒..");
            Thread.sleep(3000);
            System.out.println("当前线程:" + Thread.currentThread().getName() + "释放锁..进入等待状态");
            // 等待
            condition.await();
            System.out.println("当前线程:" + Thread.currentThread().getName() +"被唤醒,继续执行...");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void method2(){
        try {
            lock.lock();
            System.out.println("当前线程:" + Thread.currentThread().getName() + "获取锁,进入...睡眠3秒");
            Thread.sleep(3000);
            System.out.println("当前线程:" + Thread.currentThread().getName() + "发出唤醒..,并释放锁");
            // 唤醒,同样要注意多线程下死锁的发生,优先使用signalAll()唤醒所有等待此条件的线程
            condition.signal();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws Exception{
        final ConditionDemo uc = new ConditionDemo();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                uc.method1();
            }
        }, "线程1");
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                uc.method2();
            }
        }, "线程2");
        System.out.println("线程1启动。。。");
        t1.start();
        Thread.sleep(1000);
        System.out.println("线程2启动。。。");
        t2.start();
    }
}

执行结果如下:

并发编程系列之Condition接口

案例2:多个Condition,即多个等待队列


public class ConditionDemo {

    // 定义重入锁
    private ReentrantLock lock = new ReentrantLock();
    // condition1
    private Condition c1 = lock.newCondition();
    // condition2
    private Condition c2 = lock.newCondition();

    public void m1(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m1等待..");
            c1.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m1继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m2(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m2等待..");
            c1.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m2继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m3(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "进入方法m3等待..");
            c2.await();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "方法m3继续..");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m4(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
            // 唤醒所有condition1上的等待线程
            c1.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void m5(){
        try {
            lock.lock();
            System.out.println("当前线程:" +Thread.currentThread().getName() + "唤醒..");
            // 唤醒所有condition2上的等待线程
            c2.signalAll();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) {
        final ConditionDemo umc = new ConditionDemo();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                umc.m1();
            }
        },"Thread1");
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                umc.m2();
            }
        },"Thread2");
        Thread t3 = new Thread(new Runnable() {
            @Override
            public void run() {
                umc.m3();
            }
        },"Thread3");
        Thread t4 = new Thread(new Runnable() {
            @Override
            public void run() {
                umc.m4();
            }
        },"Thread4");
        Thread t5 = new Thread(new Runnable() {
            @Override
            public void run() {
                umc.m5();
            }
        },"Thread5");

        // condition1
        t1.start();
        // condition1
        t2.start();
        // condition2
        t3.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 唤醒condition1
        t4.start();
        try {
            Thread.sleep(2000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        // 唤醒condition2
        t5.start();
    }
}

执行结果如下:

并发编程系列之Condition接口

Condition底层实现

Condition接口源码中提供的是如下这几个方法,并没有具体实现,


public interface Condition {

    void await() throws InterruptedException;
    void awaitUninterruptibly();
    long awaitNanos(long nanosTimeout) throws InterruptedException;
    boolean await(long time, TimeUnit unit) throws InterruptedException;
    boolean awaitUntil(Date deadline) throws InterruptedException;
    void signal();
    void signalAll();
}

具体实现主要主要体现在ConditionObject这个类上,这个类是AQS的内部类,每个ConditionObject对象都包含一个等待队列,下面我们就分别从等待队列,等待,通知三个方面分析ConditionObject的底层实现原理

等待队列

一个condition包含一个等待队列(FIFO),condition拥有一个首节点和一个尾节点,如下图:condition中拥有首位节点的引用,当新增节点时,只需要将原来尾节点的下个节点指向它,并且更新尾节点接口,该更新操作跟之前同步队列中CAS更新尾节点不同,此处不需要CAS操作,因为condition的操作是在一个lock里面进行的,是已经获取锁的,所以这个操作是线程安全的;

并发编程系列之Condition接口

在前面所讲的wait等待通知模型中其实同步器是只有一个同步队列和一个等待队列的,而在我们这里,因为可以同时多个condition,上面案例也有使用过,也就是说,condition实现的同步器中,其实是一个同步队列和多个等待队列,从condition是AQS一个内部类也可以证实这一点,也就是说我们可以创建多个condition,每个condition都可以访问AQS提供的方法,相当于每个condition都持有所属AQS的引用,其关系模型如下:

并发编程系列之Condition接口

等待

调用Condition的await开头的系列方法,会使当前线程进入等待队列等待并释放锁,线程状态变为等待状态,已上图为例,就是同步队列中的首节点(或者获取锁的节点,因为非公平性锁就不一定是首节点)移动到了Condition的等待队列中,这里关键的就是等待方法await,我们来看下其源码实现:


public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            // 当前线程加入等待队列,并不是直接加入,而是把当前线程构造成一个新的节点再加入
            Node node = addConditionWaiter();
            // 释放同步状态即释放锁
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            // 唤醒同步队列中后续节点,线程进入等待
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            // 调用acquireQueued尝试获取同步状态,获取成功后,线程中断返回
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

整个过程示意图如下:

并发编程系列之Condition接口

通知

调用Condition的signal()方法将会唤醒在等待队列中的首节点,该节点也是到目前为止等待时间最长的节点,等待队列遵循FIFO原则。调用signalAll()方法将会唤醒该同步器上等待队列中的所有节点,我们看signal方法源码分析:


public final void signal() {
            // 前置检查,判断当前线程是否是获取了锁的线程,如果不是抛出异常
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            // 取得等待队列的头结点,头结点不为空执行doSignal,否则,唤醒结束
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

我们再来看下doSignal的源码:


private void doSignal(Node first) {
            // 调用transferForSignal将节点从等待队列移动到同步队列
            // 将该节点从等待队列删除
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

再往下看,我们追溯到transferForSignal方法:


final boolean transferForSignal(Node node) {
        // 将节点waitStatus设置为0
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
        // 调用enq方法将该节点加入同步队列
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws  0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            // 使用LockSuppor.unpark()方法唤醒该节点的线程
            LockSupport.unpark(node.thread);
        return true;
    }

signalAll()方法实现也差不多,我就不做过多讲解了,看下doSignlAll方法即可:


/**
  * Removes and transfers all nodes.
  * @param first (non-null) the first node on condition queue
 */
private void doSignalAll(Node first) {
  lastWaiter = firstWaiter = null;
    do {
       Node next = first.nextWaiter;
       first.nextWaiter = null;
       transferForSignal(first);
       first = next;
      } while (first != null);
  }

我们再来看下节点在队列中的变化过程,如下图所示:

并发编程系列之Condition接口

以上就是今天等待通知机制condition的全部内容,结合wait/notify+synchronized,对比学习,希望能对您有所收获!!!

相关文章:

原文始发于微信公众号(Justin的后端书架):

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之Condition接口


 上一篇
并发编程系列之并发容器——ConcurrentHashMap 并发编程系列之并发容器——ConcurrentHashMap
前言 之前我们讲了线程,锁以及队列同步器等等一些并发相关底层的东西,当然Java开发者在开发中很少直接去使用之前所讲的,因为Java为了简化开发,为我们提供了一整套并发容器和框架,但是这些容器和框架都是建立在之前所讲的基础之上的,今天
2021-04-05
下一篇 
并发编程系列之重入锁VS读写锁 并发编程系列之重入锁VS读写锁
前言 上节我们介绍了Java中的锁基础篇,也算是对锁有了个基本的认识,对锁底层的一些原理有所掌握,那么今天我们就来看看2个最常见的锁的实例应用,重入锁和读写锁,这是今天旅途最美的两大景点,是不是有点迫不及待了,OK,那就让我们一起开启
2021-04-05