前言
上节我们介绍了非阻塞队列ConcurrentLinkedQueue的相关内容,今天我们再来说说Java中的阻塞队列BlockingQueue,主要介绍下阻塞队列的概念,常见的阻塞队列,以及阻塞队列的底层实现。
什么是阻塞队列?
阻塞队列就是一种支持阻塞的插入和移除操作的特殊容器
阻塞的移除:当队列中没有元素时,即队列为空时,从队列中移除元素的线程就会被阻塞,直到队列中有新的元素被添加,即队列中有元素时,阻塞的线程才能继续从队列中移除元素;
阻塞队列的常见操作如下:
常见的几种阻塞队列
BlockingQueue是一个接口,主要有下面7种实现类
- transfer(E):如果当前有消费者正在等待消费,则生产者直接把元素传输给消费者,如果当前没有消费者正在等待消费,则生产者将元素存放在队列的tail节点上,并等到该元素被消费才返回(采用自旋等待);
- tryTransfer(E):将元素立刻传输给一个等待接收元素的线程,如果没有消费者就会返回false,而不将元素放入队列;
- tryTransfer(E,long,TimeUnit):将元素立刻给消费者,如果没有消费者就等待指定时间。时间到时,如果还没有消费者则失败返回false;
LinkedBlockingQueue:基于链表的阻塞队列,跟ArrayBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),LinkedBlockingQueue之所以能够高效的处理并发数据,是因为其内部实现采用分离锁(读写分离两个锁),从而实现生产者和消费者操作完全并发执行,也是一个“无界队列”
DelayQueue:带有延迟时间的无界阻塞Queue,其中的元素只有当指定的延迟时间到了,才能够从队列中获取该元素。DelayQueue中的元素必须实现Delayed接口,DelayQueue是一个没有大小限制的队列,应用场景比较多,比如对缓存超时的数据进行移除,任务超时处理,空间连接的关闭等等
LinkedTransferQueue:一个由链表结构组成的无界阻塞传输队列,主要体现在LinkedTransferQueue多2个方法:
tryTransfer(E):将元素立刻传输给一个等待接收元素的线程,如果没有消费者就会返回false,而不将元素放入队列;
LinkedBlockingDeque:由链表结构组成的双向阻塞队列,即可以从队列的两端插入和移出元素
阻塞队列的底层实现
阻塞队列的底层主要使用的还是之前我们介绍过得等待通知机制来实现的,等待通知机制在阻塞队列中具体体现为如下思想:当生产者往一个满队列中添加元素时,生产者会被阻塞,当消费者从该队列中消费了一个元素后,会通知阻塞的插入操作的生产者线程,告诉它当前队列不满,可以继续执行添加操作。我们通过下面源码可以更好的理解这一点:
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity = 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
// 使用condition模式等待通知
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 当队列满时,阻塞
notFull.await();
// 否则继续添加元素
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
// 添加元素时会唤醒等待移出数据的take线程
notEmpty.signal();
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 当队列为空时,获取数据的线程等待
notEmpty.await();
// 否则就取出元素,并且唤醒等待的put线程
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.Ecast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
// 唤醒等待的put线程
notFull.signal();
return x;
}
相关文章:
原文始发于微信公众号(Justin的后端书架):