并发编程系列之线程之间的通信

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之线程之间的通信

并发编程系列之线程之间的通信

前言

上节我们介绍了线程从创建到结束的过程,介绍了几种常见的启动和终止方法,我们知道了如何使用一个线程,那么今天我们再接下来看看1个或者多个线程之间是如何进行通信的?OK,让我们一起走进今天的并发之旅吧,祝您旅途愉快。

并发编程系列之线程之间的通信

景点一:共享变量(volatile)

关键字volatile可以用来修饰一个共享的变量字段,使用volatile修饰的变量,当访问该变量时需要从共享内存中获取最新值,而对该变量的更新,必须同步刷新回共享内存,这样就能保证所有线程对该变量访问的可见性;结合示例代码更容易理解:


public class RunThread extends Thread{

  private volatile static boolean isRunning = true;
  private void setRunning(boolean isRunning){
    RunThread.isRunning = isRunning;
  }
  
  public void run(){
    System.out.println("进入run方法..");
    while(isRunning == true){
    }
    System.out.println("线程感知isRunning值被设置成false,线程停止!!!");
  }
  
  public static void main(String[] args) throws InterruptedException {
    RunThread runThread1 = new RunThread();
    RunThread runThread2 = new RunThread();
    runThread1.start();
    runThread2.start();
    // 主线程睡眠1秒之后线程1将标识设为false
    Thread.sleep(1000);
    runThread1.setRunning(false);
    System.out.println("isRunning的值已经被设置了false");
    Thread.sleep(2000);
    System.out.println(runThread1.isAlive());
    System.out.println(runThread2.isAlive());
  }
}

结果:
进入run方法..
进入run方法..
isRunning的值已经被设置了false
线程感知isRunning值被设置成false,线程停止!!!
线程感知isRunning值被设置成false,线程停止!!!
false
false

我们可以看到,开启2个线程,线程运行终止的条件是isRunning是否为false,当2个线程都启动运行之后,主线程睡眠1秒,然后线程1将isRunning设为false,会发现,线程1和线程2立马感应到了isRunning值的更新,结束了线程的运行;

当去掉volatile修饰共享变量时,结果如下:2个线程都未感应isRunning的变化。

并发编程系列之线程之间的通信 并发编程系列之线程之间的通信

景点二:同步代码(synchronized)

synchronized修饰的方法或者代码块,主要是确保多个线程在同一时刻,只能有一个线程处于方法或者代码块中,保证了线程对变量访问的可见性和排他性,我们来看下面这段代码示例:


public class SynchronizedDemo {

    // 方法1 线程1将调用
    public synchronized void method1() {
        try {
            System.out.println(Thread.currentThread().getName() + "执行method1方法,并获取锁,主线程睡眠4秒");
            Thread.sleep(4000);
            System.out.println(Thread.currentThread().getName() + "释放锁");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    // 方法2 线程2将调用
    public synchronized void method2() {
        System.out.println(Thread.currentThread().getName() + "执行method2方法");
    }

    public static void main(String[] args) throws Exception {
        final SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
        Thread t1 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronizedDemo.method1();
            }
        }, "线程1");
       // 为了保证线程1和2的执行顺序我让线程2延迟0.5秒再启动
        Thread.sleep(500);
        Thread t2 = new Thread(new Runnable() {
            @Override
            public void run() {
                synchronizedDemo.method2();
            }
        }, "线程2");
        t1.start();
        t2.start();
    }
}

执行结果如下:

并发编程系列之线程之间的通信

当我们把method2方法的synchronized去掉之后,再看执行结果:

并发编程系列之线程之间的通信

从上面示例我们就能看出:当加了synchronized修饰之后线程1首先启动并持有对象SynchronizedDemo的对象锁,线程2启动时,发现锁被线程1占用,处于等待状态,等待3.4秒之后线程1释放锁,线程2获得锁,执行method2方法。本示例中,synchronized同步的是普通方法,所以持有的是当前实例对象锁,如果是方法块,锁是synchronized包含的代码块。

我们去就提到过,synchronized是通过Monitor对象来实现同步的,那么我们今天来对这点做个补充,我们看下Monitor是如何工作的,如下图:

并发编程系列之线程之间的通信

总结:任意一个线程对Object对象(synchronized修饰)的访问,首先要获得Object对象的监视器,如果获取失败,该线程将进入同步队列等待,该线程状态变为阻塞(BLOCKED),当访问Object的前(已经获得锁的线程)线程释放了锁,则唤醒阻塞队列中的线程,使阻塞线程重新尝试对监视器的获取;

并发编程系列之线程之间的通信

景点三:等待/通知

等待/通知机制指的是一个线程1调用了对象的wait()方法进入等待状态,另一个线程2调用该对象的notifyAll()方法,线程1收到了通知之后从对象的wait()方法返回,进而执行后续的操作,两个线程通过对象来完成交互,而对象上的wait和notify/notifyAll的关系就像一个开关信号一样,用来完成等待方和通知方之间的交互工作。我们先来看看这几个方法:

  • notify():通知一个在对象上等待的线程,由WAITING状态变为BLOCKING状态,从等待队列移动到同步队列,等待CPU调度获取该对象的锁,当该线程获取到了对象的锁后,该线程从wait()方法返回;
  • notifyAll():通知所有等待在该对象上的线程,由WAITING状态变为BLOCKING状态,等待CPU调度获取该对象的锁;
  • wait():调用该方法的线程进入WAITING状态,并将当前线程放置到对象的等待队列,只有等待另外线程的通知或被中断才会返回,(需要注意,调用wait()方法后,会释放对象的锁);
  • wait(long):超时等待一段时间,参数为毫秒,也就是等待长达n毫秒,如果没有通知就超时返回;
  • wait(long,int):第一个参数为毫秒,第二个参数为纳秒,对超时返回更细粒度的控制;

  • notifyAll():通知所有等待在该对象上的线程,由WAITING状态变为BLOCKING状态,等待CPU调度获取该对象的锁;

    wait(long):超时等待一段时间,参数为毫秒,也就是等待长达n毫秒,如果没有通知就超时返回;

    那么这几个方法如何使用呢?我们来看下面2个示例:

    
    public class WaitAndNotifyDemo {
        // 定义一个list属性
        private volatile static List list = new ArrayList();
        // 提供一个add方法
        public void add() {
            list.add("justin");
        }
        // 获取list的大小
        public int size() {
            return list.size();
        }
    
        public static void main(String[] args) {
            final WaitAndNotifyDemo list2 = new WaitAndNotifyDemo();
            final Object lock = new Object();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 使用synchronized对象锁加锁
                        synchronized (lock) {
                            System.out.println("t1启动..");
                            for (int i = 0; i  6; i++) {
                                // 往list中添加元素,当list中添加了5个元素之后,唤醒等待对象锁的线程2
                                list2.add();
                                System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了第"+(i+1)+"个元素..");
                                Thread.sleep(500);
                                if (list2.size() == 5) {
                                    System.out.println("已经发出通知..");
                                    lock.notify();
                                }
                            }
                        }
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t1");
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    // synchronized对象锁加锁吗,当list的大小不为5时,就等待,wait会释放lock锁,t1才能获得锁
                    synchronized (lock) {
                        System.out.println("t2启动..");
                        if (list2.size() != 5) {
                            try {
                                lock.wait();
                            } catch (InterruptedException e) {
                                e.printStackTrace();
                            }
                        }
                        System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
                        throw new RuntimeException();
                    }
                }
            }, "t2");
            // 为了保证先启动线程2执行等待,所以我给线程1加了个延迟0.5秒启动
            t2.start();
            try {
                Thread.sleep(500);
                t1.start();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    

    执行结果如下:

    并发编程系列之线程之间的通信

    我们再看另一个例子,利用等待/通知机制我们来自定义实现一个有界阻塞队列ArrayBlockingQueue:

    
    public class DefinedArrayBlockingQueue {
    
        // 定义一个list
        private final LinkedListObject list = new LinkedListObject();
        // 用于原子操作计数器
        private final AtomicInteger count = new AtomicInteger(0);
        // 队列最大值和最小值
        private final int maxSize;
        private final int minSize = 0;
        // 用于synchronized对象锁
        private final Object lock = new Object();
    
        public DefinedArrayBlockingQueue(int maxSize) {
            this.maxSize = maxSize;
        }
    
        // 往队列中加入元素
        public void put(Object obj) {
            synchronized (lock) {
                // 当队列中元素达到最大值就等待状态
                while (count.get() == maxSize) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                list.add(obj);
                // 计数器+1
                count.getAndIncrement();
                System.out.println(" 元素 " + obj + " 被添加 ");
                // 唤醒take方法的wait
                lock.notify();
            }
        }
    
        public Object take() {
            Object temp = null;
            synchronized (lock) {
                while (count.get() == minSize) {
                    try {
                        lock.wait();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
                // 计数器-1
                count.getAndDecrement();
                temp = list.removeFirst();
                System.out.println(" 元素 " + temp + " 被消费 ");
                // 唤醒put方法里面的wait
                lock.notify();
            }
            return temp;
        }
    
        public int size() {
            return count.get();
        }
    
        public static void main(String[] args) throws Exception {
            // 定义一个自定义队列,最大长度设置为5
            final DefinedArrayBlockingQueue m = new DefinedArrayBlockingQueue(5);
            m.put("1");
            m.put("2");
            m.put("3");
            m.put("4");
            m.put("5");
            System.out.println("当前元素个数:" + m.size());
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    m.put("6");
                    m.put("7");
                }
            }, "t1");
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        m.take();
                        Thread.sleep(1000);
                        m.take();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "t2");
            t1.start();
            Thread.sleep(1000);
            t2.start();
        }
    }
    

    运行结果:

    并发编程系列之线程之间的通信

    我们再来分析下,等待唤醒机制,并做个总结,整个等待唤醒的过程如下:等待线程首先获取对象锁,然后调用对象的wait()方法,此时释放对象锁,由于等待线程释放了对象锁,唤醒线程随后获取到对象锁,并调用对象的notify()方法,将等待队列中的线程移到了同步队列中,此时等待线程状态的状态变为阻塞状态,唤醒线程释放了锁之后,等待线程再次获取到对象锁,并从wait()方法返回继续执行,过程如下图:

    并发编程系列之线程之间的通信 并发编程系列之线程之间的通信

    景点四:管道输入/输出流

    管道输入输出流和普通文件的输入输出不同之处在于,它是作用于线程之间的数据传输,以内存作为传输媒介,主要包括下面4种具体实现:

  • PipedOutputStream(字节)
  • PipedInputStream(字节)
  • PipedReader(字符)
  • PipedWriter(字符)
  • PipedInputStream(字节)

    PipedWriter(字符)

    我们来看下下面这个demo:

    
    public class PipedDemo {
    
        public static void main(String[] args) throws IOException {
            // 定义一个输入流
            PipedWriter out = new PipedWriter();
            // 定义一个输出流
            PipedReader in = new PipedReader();
            // 输入输出建立连接
            out.connect(in);
            Thread printThread = new Thread(new PrintThread(in),"打印");
            printThread.start();
            int receive = 0;
            try {
                // 输入键盘字符
                System.out.println("请输入任意字符,并按enter键发送:");
                while ((receive = System.in.read()) != -1) {
                    out.write(receive);
                }
            }finally {
                out.close();
            }
        }
    
    
        static class PrintThread implements Runnable{
            private PipedReader in;
            // 读取写入的字符
            public PrintThread(PipedReader in) {
                this.in = in;
            }
    
            @Override
            public void run() {
                int receive = 0;
                try {
                    while ((receive = in.read()) != -1){
                        System.out.print((char)receive);
                    }
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }
    

    运行结果如下:

    并发编程系列之线程之间的通信 并发编程系列之线程之间的通信

    景点五:join方法

    如果一个线程执行了thread.join()方法,就说明:当前主线程需要等待执行join方法的线程终止之后才从thread.join()中返回,主线程才继续往下执行,join主要包括下面3个方法:

  • join源码:

  • public final void join() throws InterruptedException {
            join(0);
        }
    join超时返回源码:
    public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
    

           if (millis 0) {
               throw new IllegalArgumentException(“timeout value is negative”);
           }

           if (millis == 0) {
               while (isAlive()) {
                   wait(0);
               }
           } else {
               while (isAlive()) {
                   long delay = millis - now;
                   if (delay = 0) {
                       break;
                   }
                   wait(delay);
                   now = System.currentTimeMillis() - base;
               }
           }
       }

  • join超时返回源码(更细粒度的控制超时时间):
    public final synchronized void join(long millis, int nanos)
        throws InterruptedException {
            if (millis  0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
            if (nanos  0 || nanos  999999) {
                throw new IllegalArgumentException(
                                    "nanosecond timeout value out of range");
            }
            if (nanos = 500000 || (nanos != 0 && millis == 0)) {
                millis++;
            }
            join(millis);
        }
  • join超时返回源码:

    
    public final synchronized void join(long millis)
        throws InterruptedException {
            long base = System.currentTimeMillis();
            long now = 0;
    
            if (millis  0) {
                throw new IllegalArgumentException("timeout value is negative");
            }
    
            if (millis == 0) {
                while (isAlive()) {
                    wait(0);
                }
            } else {
                while (isAlive()) {
                    long delay = millis - now;
                    if (delay = 0) {
                        break;
                    }
                    wait(delay);
                    now = System.currentTimeMillis() - base;
                }
            }
        }
    

    我们再看下如何使用join:

    
    public class JoinDemo implements Runnable {
            @Override
            public void run() {
                System.out.println("线程"+Thread.currentThread().getName()+":开始运行"+ DateUtil.getNowDate());
                try{
                    Thread.sleep(5000);
                }catch(InterruptedException e){
                    e.printStackTrace();
                }
                System.out.println("线程"+Thread.currentThread().getName()+":结束运行"+DateUtil.getNowDate());
            }
    
        public static void main(String[] args) {
            System.out.println("Main Thread Start..."+DateUtil.getNowDate());
            JoinDemo joinDemo = new JoinDemo();
            Thread t1 = new Thread(joinDemo,"t1");
            Thread t2 = new Thread(joinDemo,"t2");
            t1.start();
            t2.start();
        }
    }
    

    运行结果如下:

    并发编程系列之线程之间的通信

    我们使用join之后再看看运行结果会怎样:

    并发编程系列之线程之间的通信

    使用join之后我们发现,线程1没有返回之前,主线程一直在阻塞着,线程2一直没有启动,等到5秒线程1运行结束之后,主线程继续,线程2启动并执行。

    并发编程系列之线程之间的通信

    景点六:ThreadLocal本地线程

    线程变量,以一个ThreadLocal对象为键,任意对象为值得存储结构,一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值,相当于每个线程都有一个独立的本地对象,这块内存是线程私有的。对其他线程是不可见的,线程可以利用这块内存做自己的事,而不会受其他线程干扰。

    我们来看下面的demo:

    
    public class ThreadLocalDemo {
        public static ThreadLocalString th = new ThreadLocalString();
        public void setTh(String value){
            th.set(value);
        }
        public void getTh(){
            System.out.println(Thread.currentThread().getName() + "的值:" + this.th.get());
        }
    
        public static void main(String[] args) throws InterruptedException {
            final ThreadLocalDemo ct = new ThreadLocalDemo();
            Thread t1 = new Thread(new Runnable() {
                @Override
                public void run() {
                    ct.setTh("Justin");
                    ct.getTh();
                }
            }, "线程1");
    
            Thread t2 = new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Thread.sleep(1000);
                        //ct.setTh("Java");
                        ct.getTh();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }, "线程2");
    
            t1.start();
            t2.start();
        }
    }
    

    运行结果如下:结果很正常,对于同一个对象,线程1第一次赋值,线程2第二次赋值,分别输出第一次和第二次的结果。

    并发编程系列之线程之间的通信

    当我们把线程2赋值注释起来,按道理是应该会拿到线程1赋的值,但是我们看下面的结果:很明显线程2拿到个null,说明线程2没获取到线程1的值,这相当于,虽然是同一个对象,但是在两个线程中却有着不同的副本。

    并发编程系列之线程之间的通信

    今天的旅途就到这了,我们已经逛完了线程之间通信的所有景点,感谢光临,下次再见!!!

    相关旅程:

    原文始发于微信公众号(Justin的后端书架):

    本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

    本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

    原文链接:blog.ouyangsihai.cn >> 并发编程系列之线程之间的通信


     上一篇
    并发编程系列之重入锁VS读写锁 并发编程系列之重入锁VS读写锁
    前言 上节我们介绍了Java中的锁基础篇,也算是对锁有了个基本的认识,对锁底层的一些原理有所掌握,那么今天我们就来看看2个最常见的锁的实例应用,重入锁和读写锁,这是今天旅途最美的两大景点,是不是有点迫不及待了,OK,那就让我们一起开启
    2021-04-05
    下一篇 
    并发编程系列之线程的启动终止 并发编程系列之线程的启动终止
    前言 上节我们对线程有了个基本的概念和认识,从线程状态转变过程我们也已经知道了线程通过调用start方法进行启动,直到run方法执行线程结束,今天我们就来详细的说说启动和终止线程的细节,OK,让我们开始今天的并发之旅吧。 创建线程
    2021-04-05