前言
上节我们介绍了线程从创建到结束的过程,介绍了几种常见的启动和终止方法,我们知道了如何使用一个线程,那么今天我们再接下来看看1个或者多个线程之间是如何进行通信的?OK,让我们一起走进今天的并发之旅吧,祝您旅途愉快。
景点一:共享变量(volatile)
关键字volatile可以用来修饰一个共享的变量字段,使用volatile修饰的变量,当访问该变量时需要从共享内存中获取最新值,而对该变量的更新,必须同步刷新回共享内存,这样就能保证所有线程对该变量访问的可见性;结合示例代码更容易理解:
public class RunThread extends Thread{
private volatile static boolean isRunning = true;
private void setRunning(boolean isRunning){
RunThread.isRunning = isRunning;
}
public void run(){
System.out.println("进入run方法..");
while(isRunning == true){
}
System.out.println("线程感知isRunning值被设置成false,线程停止!!!");
}
public static void main(String[] args) throws InterruptedException {
RunThread runThread1 = new RunThread();
RunThread runThread2 = new RunThread();
runThread1.start();
runThread2.start();
// 主线程睡眠1秒之后线程1将标识设为false
Thread.sleep(1000);
runThread1.setRunning(false);
System.out.println("isRunning的值已经被设置了false");
Thread.sleep(2000);
System.out.println(runThread1.isAlive());
System.out.println(runThread2.isAlive());
}
}
结果:
进入run方法..
进入run方法..
isRunning的值已经被设置了false
线程感知isRunning值被设置成false,线程停止!!!
线程感知isRunning值被设置成false,线程停止!!!
false
false
我们可以看到,开启2个线程,线程运行终止的条件是isRunning是否为false,当2个线程都启动运行之后,主线程睡眠1秒,然后线程1将isRunning设为false,会发现,线程1和线程2立马感应到了isRunning值的更新,结束了线程的运行;
当去掉volatile修饰共享变量时,结果如下:2个线程都未感应isRunning的变化。
景点二:同步代码(synchronized)
synchronized修饰的方法或者代码块,主要是确保多个线程在同一时刻,只能有一个线程处于方法或者代码块中,保证了线程对变量访问的可见性和排他性,我们来看下面这段代码示例:
public class SynchronizedDemo {
// 方法1 线程1将调用
public synchronized void method1() {
try {
System.out.println(Thread.currentThread().getName() + "执行method1方法,并获取锁,主线程睡眠4秒");
Thread.sleep(4000);
System.out.println(Thread.currentThread().getName() + "释放锁");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 方法2 线程2将调用
public synchronized void method2() {
System.out.println(Thread.currentThread().getName() + "执行method2方法");
}
public static void main(String[] args) throws Exception {
final SynchronizedDemo synchronizedDemo = new SynchronizedDemo();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
synchronizedDemo.method1();
}
}, "线程1");
// 为了保证线程1和2的执行顺序我让线程2延迟0.5秒再启动
Thread.sleep(500);
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
synchronizedDemo.method2();
}
}, "线程2");
t1.start();
t2.start();
}
}
执行结果如下:
当我们把method2方法的synchronized去掉之后,再看执行结果:
从上面示例我们就能看出:当加了synchronized修饰之后线程1首先启动并持有对象SynchronizedDemo的对象锁,线程2启动时,发现锁被线程1占用,处于等待状态,等待3.4秒之后线程1释放锁,线程2获得锁,执行method2方法。本示例中,synchronized同步的是普通方法,所以持有的是当前实例对象锁,如果是方法块,锁是synchronized包含的代码块。
我们去就提到过,synchronized是通过Monitor对象来实现同步的,那么我们今天来对这点做个补充,我们看下Monitor是如何工作的,如下图:
总结:任意一个线程对Object对象(synchronized修饰)的访问,首先要获得Object对象的监视器,如果获取失败,该线程将进入同步队列等待,该线程状态变为阻塞(BLOCKED),当访问Object的前(已经获得锁的线程)线程释放了锁,则唤醒阻塞队列中的线程,使阻塞线程重新尝试对监视器的获取;
景点三:等待/通知
等待/通知机制指的是一个线程1调用了对象的wait()方法进入等待状态,另一个线程2调用该对象的notifyAll()方法,线程1收到了通知之后从对象的wait()方法返回,进而执行后续的操作,两个线程通过对象来完成交互,而对象上的wait和notify/notifyAll的关系就像一个开关信号一样,用来完成等待方和通知方之间的交互工作。我们先来看看这几个方法:
wait(long,int):第一个参数为毫秒,第二个参数为纳秒,对超时返回更细粒度的控制;
notifyAll():通知所有等待在该对象上的线程,由WAITING状态变为BLOCKING状态,等待CPU调度获取该对象的锁;
wait(long):超时等待一段时间,参数为毫秒,也就是等待长达n毫秒,如果没有通知就超时返回;
那么这几个方法如何使用呢?我们来看下面2个示例:
public class WaitAndNotifyDemo {
// 定义一个list属性
private volatile static List list = new ArrayList();
// 提供一个add方法
public void add() {
list.add("justin");
}
// 获取list的大小
public int size() {
return list.size();
}
public static void main(String[] args) {
final WaitAndNotifyDemo list2 = new WaitAndNotifyDemo();
final Object lock = new Object();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
try {
// 使用synchronized对象锁加锁
synchronized (lock) {
System.out.println("t1启动..");
for (int i = 0; i 6; i++) {
// 往list中添加元素,当list中添加了5个元素之后,唤醒等待对象锁的线程2
list2.add();
System.out.println("当前线程:" + Thread.currentThread().getName() + "添加了第"+(i+1)+"个元素..");
Thread.sleep(500);
if (list2.size() == 5) {
System.out.println("已经发出通知..");
lock.notify();
}
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
// synchronized对象锁加锁吗,当list的大小不为5时,就等待,wait会释放lock锁,t1才能获得锁
synchronized (lock) {
System.out.println("t2启动..");
if (list2.size() != 5) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println("当前线程:" + Thread.currentThread().getName() + "收到通知线程停止..");
throw new RuntimeException();
}
}
}, "t2");
// 为了保证先启动线程2执行等待,所以我给线程1加了个延迟0.5秒启动
t2.start();
try {
Thread.sleep(500);
t1.start();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行结果如下:
我们再看另一个例子,利用等待/通知机制我们来自定义实现一个有界阻塞队列ArrayBlockingQueue:
public class DefinedArrayBlockingQueue {
// 定义一个list
private final LinkedListObject list = new LinkedListObject();
// 用于原子操作计数器
private final AtomicInteger count = new AtomicInteger(0);
// 队列最大值和最小值
private final int maxSize;
private final int minSize = 0;
// 用于synchronized对象锁
private final Object lock = new Object();
public DefinedArrayBlockingQueue(int maxSize) {
this.maxSize = maxSize;
}
// 往队列中加入元素
public void put(Object obj) {
synchronized (lock) {
// 当队列中元素达到最大值就等待状态
while (count.get() == maxSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
list.add(obj);
// 计数器+1
count.getAndIncrement();
System.out.println(" 元素 " + obj + " 被添加 ");
// 唤醒take方法的wait
lock.notify();
}
}
public Object take() {
Object temp = null;
synchronized (lock) {
while (count.get() == minSize) {
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
// 计数器-1
count.getAndDecrement();
temp = list.removeFirst();
System.out.println(" 元素 " + temp + " 被消费 ");
// 唤醒put方法里面的wait
lock.notify();
}
return temp;
}
public int size() {
return count.get();
}
public static void main(String[] args) throws Exception {
// 定义一个自定义队列,最大长度设置为5
final DefinedArrayBlockingQueue m = new DefinedArrayBlockingQueue(5);
m.put("1");
m.put("2");
m.put("3");
m.put("4");
m.put("5");
System.out.println("当前元素个数:" + m.size());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
m.put("6");
m.put("7");
}
}, "t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
m.take();
Thread.sleep(1000);
m.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "t2");
t1.start();
Thread.sleep(1000);
t2.start();
}
}
运行结果:
我们再来分析下,等待唤醒机制,并做个总结,整个等待唤醒的过程如下:等待线程首先获取对象锁,然后调用对象的wait()方法,此时释放对象锁,由于等待线程释放了对象锁,唤醒线程随后获取到对象锁,并调用对象的notify()方法,将等待队列中的线程移到了同步队列中,此时等待线程状态的状态变为阻塞状态,唤醒线程释放了锁之后,等待线程再次获取到对象锁,并从wait()方法返回继续执行,过程如下图:
景点四:管道输入/输出流
管道输入输出流和普通文件的输入输出不同之处在于,它是作用于线程之间的数据传输,以内存作为传输媒介,主要包括下面4种具体实现:
PipedInputStream(字节)
PipedWriter(字符)
我们来看下下面这个demo:
public class PipedDemo {
public static void main(String[] args) throws IOException {
// 定义一个输入流
PipedWriter out = new PipedWriter();
// 定义一个输出流
PipedReader in = new PipedReader();
// 输入输出建立连接
out.connect(in);
Thread printThread = new Thread(new PrintThread(in),"打印");
printThread.start();
int receive = 0;
try {
// 输入键盘字符
System.out.println("请输入任意字符,并按enter键发送:");
while ((receive = System.in.read()) != -1) {
out.write(receive);
}
}finally {
out.close();
}
}
static class PrintThread implements Runnable{
private PipedReader in;
// 读取写入的字符
public PrintThread(PipedReader in) {
this.in = in;
}
@Override
public void run() {
int receive = 0;
try {
while ((receive = in.read()) != -1){
System.out.print((char)receive);
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
运行结果如下:
景点五:join方法
如果一个线程执行了thread.join()方法,就说明:当前主线程需要等待执行join方法的线程终止之后才从thread.join()中返回,主线程才继续往下执行,join主要包括下面3个方法:
join源码:
public final void join() throws InterruptedException {
join(0);
}
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis 0) {
throw new IllegalArgumentException(“timeout value is negative”);
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay = 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
public final synchronized void join(long millis, int nanos)
throws InterruptedException {
if (millis 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (nanos 0 || nanos 999999) {
throw new IllegalArgumentException(
"nanosecond timeout value out of range");
}
if (nanos = 500000 || (nanos != 0 && millis == 0)) {
millis++;
}
join(millis);
}
join超时返回源码:
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis 0) {
throw new IllegalArgumentException("timeout value is negative");
}
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
while (isAlive()) {
long delay = millis - now;
if (delay = 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
我们再看下如何使用join:
public class JoinDemo implements Runnable {
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+":开始运行"+ DateUtil.getNowDate());
try{
Thread.sleep(5000);
}catch(InterruptedException e){
e.printStackTrace();
}
System.out.println("线程"+Thread.currentThread().getName()+":结束运行"+DateUtil.getNowDate());
}
public static void main(String[] args) {
System.out.println("Main Thread Start..."+DateUtil.getNowDate());
JoinDemo joinDemo = new JoinDemo();
Thread t1 = new Thread(joinDemo,"t1");
Thread t2 = new Thread(joinDemo,"t2");
t1.start();
t2.start();
}
}
运行结果如下:
我们使用join之后再看看运行结果会怎样:
使用join之后我们发现,线程1没有返回之前,主线程一直在阻塞着,线程2一直没有启动,等到5秒线程1运行结束之后,主线程继续,线程2启动并执行。
景点六:ThreadLocal本地线程
线程变量,以一个ThreadLocal对象为键,任意对象为值得存储结构,一个线程可以根据一个ThreadLocal对象查询到绑定在这个线程上的一个值,相当于每个线程都有一个独立的本地对象,这块内存是线程私有的。对其他线程是不可见的,线程可以利用这块内存做自己的事,而不会受其他线程干扰。
我们来看下面的demo:
public class ThreadLocalDemo {
public static ThreadLocalString th = new ThreadLocalString();
public void setTh(String value){
th.set(value);
}
public void getTh(){
System.out.println(Thread.currentThread().getName() + "的值:" + this.th.get());
}
public static void main(String[] args) throws InterruptedException {
final ThreadLocalDemo ct = new ThreadLocalDemo();
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
ct.setTh("Justin");
ct.getTh();
}
}, "线程1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000);
//ct.setTh("Java");
ct.getTh();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}, "线程2");
t1.start();
t2.start();
}
}
运行结果如下:结果很正常,对于同一个对象,线程1第一次赋值,线程2第二次赋值,分别输出第一次和第二次的结果。
当我们把线程2赋值注释起来,按道理是应该会拿到线程1赋的值,但是我们看下面的结果:很明显线程2拿到个null,说明线程2没获取到线程1的值,这相当于,虽然是同一个对象,但是在两个线程中却有着不同的副本。
今天的旅途就到这了,我们已经逛完了线程之间通信的所有景点,感谢光临,下次再见!!!
相关旅程:
原文始发于微信公众号(Justin的后端书架):