前言
上节我们介绍了线程安全的HashMap,今天我们再来介绍一个线程安全的并发容器:ConcurrentLinkedQueue,它是一个线程安全的队列,在Java中如果要实现一个线程安全的队列由2种方式:一个是使用阻塞算法的队列,用一个锁(入队和出队列共享同一把锁)或者两个锁(入队和出队各持一把锁)来实现,另一个是使用非阻塞的CAS方式来实现,今天我们我们就来一起看看如何使用非阻塞方式实现ConcurrentLinkedQueue,OK,让我们扬帆起航,开始今天的并发之旅吧。
什么是ConcurrentLinkedQueue?
ConcurrentLinkedQueue是一个适用于高并发场景的队列,通过无锁的方式,实现了高并发下状态下的高性能,性能一般要比BlockingQueue(阻塞队列)性能好,它是一个基于链表节点的无界限安全队列(不允许Null元素存在,有null存在则链表就会断了)。该队列的元素遵循先进先出(FIFO)原则,头是最先加入的,尾是最近加入的,单向管道一头进另一头出,它采用了无等待算法来实现(CAS)。
ConcurrentLinkedQueue的结构(tair应该为tail,图是网上引的所以特意说明下)
初始化时head为空,tair=head,我们可以从源码看出这点:
private transient volatile NodeE tail;
public ConcurrentLinkedQueue() {
head = tail = new NodeE(null);
}
ConcurrentLinkedQueue主要方法:
入队列
入队列的过程:入队列就是将入队列的节点添加到队列的尾部的过程,例如我们添加向一个空队列,我们来看下,节点的插入过程,如下图所示:
从上面的示例我们可以看出,tail不一定是队列的尾节点,再元素入队列的时候,会先判断当前tail节点的next节点是否为空,如果为空,则将新增元素节点设置为tail的next节点,tail保持不变,如果不为空,则将新增节点设置为tail,当前tail的next则为空。
多线程环境下,情况比上面要更复杂,例如一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,先完成了入队,那么队列的尾节点就会发生变化,此时当前线程就必须暂停入队操作,然后重新获取尾节点,再进行入队,下面我们来看下源码实现:
public boolean offer(E e) {
// 检查入队节点是否为空,为空则抛出异常(进入checkNotNull方法可查阅)
checkNotNull(e);
// 入队前,创建一个入队节点newNode
final NodeE newNode = new NodeE(e);
// 死循环,入队不成功反复入队 p用来表示队列的尾节点,默认情况下等于tail节点,t是一个指向tail的引用
for (NodeE t = tail, p = t;;) {
// 获得p节点的下一个节点q
NodeE q = p.next;
// 如果q为空,,说明p为尾节点,则将新增节点newNode设置为tail的next节点
if (q == null) {
// 将新节点以CAS方式加到队列末尾
if (p.casNext(null, newNode)) {
// 如果p和tail引用p不同,说明引用t指向的节点不是最新尾节点,让t指向最新的节点
if (p != t)
// 更新tail节点引用
casTail(t, newNode);
return true;
}
}
// 说明p节点和其尾节点都是null,这表明这个队列刚刚初始化,因此返回head节点
else if (p == q)
p = (t != (t = tail)) ? t : head;
// 否则说明p不为空,将新增的尾节点节点设置为tail节点
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
入队还有一个方法:add,我们再来看下add方法,其本质还是offer,如下:
public boolean add(E e) {
return offer(e);
}
小结:从源码我们得出入队列主要做2件事:定位出尾节点和使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。此外我们从源码能看出来offer方法总是返回true,所以我们不能感觉返回值来判断是否真正入队列成功,如果要判断,可以入队列之前和完成之后分别获取下队列的大小,再相比较;
出队列
出队列过程:出队列就是一个从队列里返回头节点元素,并清空该节点对元素的引用的过程,和入队列过程相反,这里就不举例子说明了,我们直接往下看,出队列有2个方法,poll和peek都是取头元素节点,区别在于前者会删除元素,后者不会,我们看源码如下:
public E poll() {
// 设置起始点
restartFromHead:
for (;;) {
// h指向head,p表示头结点,需要出队的节点,q表示next节点
for (NodeE h = head, p = h, q;;) {
E item = p.item;
// 如果p节点里面有元素,则该元素直接出队列,并将p元素节点CAS更新为NUll
if (item != null && p.casItem(item, null)) {
// p不为h的时候说明h还在前一个项为NUll的位置,而p在后一个项有item的位置,则更新h指向
if (p != h)
// 若p的后驱不为null,则将h指向p的后驱,因为当前p的项也是要出队的
// 若p的后驱为NUll,则将h指向p,此时他们的item都为Null
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
// p.item=null p的后驱为null,用P更新h值
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
else if (p == q)
continue restartFromHead;
// q在p后面,p中节点已经出队,则让p指向q
else
p = q;
}
}
}
我们再看下peek的源码,跟poll差不多,只是减少了删除出队列的节点操作,如下:
public E peek() {
restartFromHead:
for (;;) {
for (NodeE h = head, p = h, q;;) {
E item = p.item;
if (item != null || (q = p.next) == null) {
updateHead(h, p);
return item;
}
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
小结:出队列过程中首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作,将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。
Size方法
这里我将size方法做个说明,主要是因为size比较特殊:用循环获取,判断节点是否有值。然后累加,这是一种非常低效的方式,我们在判断一个ConcurrentLinkedQueue是否为空时,尽量使用isEmpty()方法,而避免使用size()方法降低程序执行效率,源代码如下:
public int size() {
int count = 0;
for (NodeE p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
相关文章:
原文始发于微信公众号(Justin的后端书架):