并发编程系列之并发容器——ConcurrentLinkedQueue

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之并发容器——ConcurrentLinkedQueue

并发编程系列之并发容器:ConcurrentLinkedQueue

前言

上节我们介绍了线程安全的HashMap,今天我们再来介绍一个线程安全的并发容器:ConcurrentLinkedQueue,它是一个线程安全的队列,在Java中如果要实现一个线程安全的队列由2种方式:一个是使用阻塞算法的队列,用一个锁(入队和出队列共享同一把锁)或者两个锁(入队和出队各持一把锁)来实现,另一个是使用非阻塞的CAS方式来实现,今天我们我们就来一起看看如何使用非阻塞方式实现ConcurrentLinkedQueue,OK,让我们扬帆起航,开始今天的并发之旅吧。

并发编程系列之并发容器:ConcurrentLinkedQueue

什么是ConcurrentLinkedQueue?

ConcurrentLinkedQueue是一个适用于高并发场景的队列,通过无锁的方式,实现了高并发下状态下的高性能,性能一般要比BlockingQueue(阻塞队列)性能好,它是一个基于链表节点的无界限安全队列(不允许Null元素存在,有null存在则链表就会断了)。该队列的元素遵循先进先出(FIFO)原则,头是最先加入的,尾是最近加入的,单向管道一头进另一头出,它采用了无等待算法来实现(CAS)。

并发编程系列之并发容器:ConcurrentLinkedQueue

ConcurrentLinkedQueue的结构(tair应该为tail,图是网上引的所以特意说明下

并发编程系列之并发容器:ConcurrentLinkedQueue 并发编程系列之并发容器:ConcurrentLinkedQueue

初始化时head为空,tair=head,我们可以从源码看出这点:


private transient volatile NodeE tail;
public ConcurrentLinkedQueue() {
       head = tail = new NodeE(null);
   }

ConcurrentLinkedQueue主要方法:

并发编程系列之并发容器:ConcurrentLinkedQueue 并发编程系列之并发容器:ConcurrentLinkedQueue

入队列

入队列的过程:入队列就是将入队列的节点添加到队列的尾部的过程,例如我们添加向一个空队列,我们来看下,节点的插入过程,如下图所示:

并发编程系列之并发容器:ConcurrentLinkedQueue

从上面的示例我们可以看出,tail不一定是队列的尾节点,再元素入队列的时候,会先判断当前tail节点的next节点是否为空,如果为空,则将新增元素节点设置为tail的next节点,tail保持不变,如果不为空,则将新增节点设置为tail,当前tail的next则为空。

多线程环境下,情况比上面要更复杂,例如一个线程正在入队,那么它必须先获取尾节点,然后设置尾节点的下一个节点为入队节点,但这时可能有另外一个线程插队了,先完成了入队,那么队列的尾节点就会发生变化,此时当前线程就必须暂停入队操作,然后重新获取尾节点,再进行入队,下面我们来看下源码实现:


public boolean offer(E e) {
       // 检查入队节点是否为空,为空则抛出异常(进入checkNotNull方法可查阅)
       checkNotNull(e);
       // 入队前,创建一个入队节点newNode 
       final NodeE newNode = new NodeE(e);
       // 死循环,入队不成功反复入队 p用来表示队列的尾节点,默认情况下等于tail节点,t是一个指向tail的引用
       for (NodeE t = tail, p = t;;) {
           // 获得p节点的下一个节点q
           NodeE q = p.next;
           // 如果q为空,,说明p为尾节点,则将新增节点newNode设置为tail的next节点
           if (q == null) {
               // 将新节点以CAS方式加到队列末尾
               if (p.casNext(null, newNode)) {
                   // 如果p和tail引用p不同,说明引用t指向的节点不是最新尾节点,让t指向最新的节点
                   if (p != t)
                       // 更新tail节点引用
                       casTail(t, newNode);  
                   return true;
               }
           }
           // 说明p节点和其尾节点都是null,这表明这个队列刚刚初始化,因此返回head节点 
           else if (p == q)
               p = (t != (t = tail)) ? t : head;
           // 否则说明p不为空,将新增的尾节点节点设置为tail节点
           else
               p = (p != t && t != (t = tail)) ? t : q;
       }
   }

入队还有一个方法:add,我们再来看下add方法,其本质还是offer,如下:


public boolean add(E e) {
       return offer(e);
   }

小结:从源码我们得出入队列主要做2件事:定位出尾节点和使用CAS算法能将入队节点设置成尾节点的next节点,如不成功则重试。此外我们从源码能看出来offer方法总是返回true,所以我们不能感觉返回值来判断是否真正入队列成功,如果要判断,可以入队列之前和完成之后分别获取下队列的大小,再相比较;

并发编程系列之并发容器:ConcurrentLinkedQueue

出队列

出队列过程:出队列就是一个从队列里返回头节点元素,并清空该节点对元素的引用的过程,和入队列过程相反,这里就不举例子说明了,我们直接往下看,出队列有2个方法,poll和peek都是取头元素节点,区别在于前者会删除元素,后者不会,我们看源码如下:


public E poll() {
      // 设置起始点
       restartFromHead:
       for (;;) {
      // h指向head,p表示头结点,需要出队的节点,q表示next节点
           for (NodeE h = head, p = h, q;;) {
               E item = p.item;
     // 如果p节点里面有元素,则该元素直接出队列,并将p元素节点CAS更新为NUll
               if (item != null && p.casItem(item, null)) {
                   // p不为h的时候说明h还在前一个项为NUll的位置,而p在后一个项有item的位置,则更新h指向 
                   if (p != h) 
                       // 若p的后驱不为null,则将h指向p的后驱,因为当前p的项也是要出队的
                       // 若p的后驱为NUll,则将h指向p,此时他们的item都为Null
                       updateHead(h, ((q = p.next) != null) ? q : p);
                   return item;
               }
               // p.item=null p的后驱为null,用P更新h值
               else if ((q = p.next) == null) {
                   updateHead(h, p);
                   return null;
               }
               else if (p == q)
                   continue restartFromHead;
               // q在p后面,p中节点已经出队,则让p指向q  
               else
                   p = q;
           }
       }
   }

我们再看下peek的源码,跟poll差不多,只是减少了删除出队列的节点操作,如下:


public E peek() {
        restartFromHead:
        for (;;) {
            for (NodeE h = head, p = h, q;;) {
                E item = p.item;
                if (item != null || (q = p.next) == null) {
                    updateHead(h, p);
                    return item;
                }
                else if (p == q)
                    continue restartFromHead;
                else
                    p = q;
            }
        }
    }

小结:出队列过程中首先获取头节点的元素,然后判断头节点元素是否为空,如果为空,表示另外一个线程已经进行了一次出队操作,将该节点的元素取走,如果不为空,则使用CAS的方式将头节点的引用设置成null,如果CAS成功,则直接返回头节点的元素,如果不成功,表示另外一个线程已经进行了一次出队操作更新了head节点,导致元素发生了变化,需要重新获取头节点。

并发编程系列之并发容器:ConcurrentLinkedQueue

Size方法

这里我将size方法做个说明,主要是因为size比较特殊:用循环获取,判断节点是否有值。然后累加,这是一种非常低效的方式,我们在判断一个ConcurrentLinkedQueue是否为空时,尽量使用isEmpty()方法,而避免使用size()方法降低程序执行效率,源代码如下:


public int size() {
        int count = 0;
        for (NodeE p = first(); p != null; p = succ(p))
            if (p.item != null)
                // Collection.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
        return count;
    }

相关文章:

并发编程系列之并发容器:ConcurrentLinkedQueue

原文始发于微信公众号(Justin的后端书架):

本人花费半年的时间总结的《Java面试指南》已拿腾讯等大厂offer,已开源在github ,欢迎star!

本文GitHub https://github.com/OUYANGSIHAI/JavaInterview 已收录,这是我花了6个月总结的一线大厂Java面试总结,本人已拿大厂offer,欢迎star

原文链接:blog.ouyangsihai.cn >> 并发编程系列之并发容器——ConcurrentLinkedQueue


 上一篇
并发编程系列之阻塞队列(BlockingQueue) 并发编程系列之阻塞队列(BlockingQueue)
前言 上节我们介绍了非阻塞队列ConcurrentLinkedQueue的相关内容,今天我们再来说说Java中的阻塞队列BlockingQueue,主要介绍下阻塞队列的概念,常见的阻塞队列,以及阻塞队列的底层实现。 什么是阻塞队
2021-04-05
下一篇 
并发编程系列之并发容器——ConcurrentHashMap 并发编程系列之并发容器——ConcurrentHashMap
前言 之前我们讲了线程,锁以及队列同步器等等一些并发相关底层的东西,当然Java开发者在开发中很少直接去使用之前所讲的,因为Java为了简化开发,为我们提供了一整套并发容器和框架,但是这些容器和框架都是建立在之前所讲的基础之上的,今天
2021-04-05