简介

  • AbstractQueuedSynchronizer(AQS ) 即队列同步器。它是构建锁或者其他同步组件的基础框架(如 ReentrantLock、ReentrantReadWriteLock、Semaphore 等),它使用了一个int成员变量表示同步状态,通过内置的FIFO队列完成资源获取线程的派对工作 J.U.C 并发包的作者(Doug Lea)期望它能够成为实现大部分同步需求的基础。它是 J.U.C 并发包中的核心基础组件。

优势

  • AQS 解决了在实现同步器时涉及当的大量细节问题,例如获取同步状态、FIFO 同步队列。基于 AQS 来构建同步器可以带来很多好处。它不仅能够极大地减少实现工作,而且也不必处理在多个位置上发生的竞争问题。
  • 在基于 AQS 构建的同步器中,只能在一个时刻发生阻塞,从而降低上下文切换的开销,提高了吞吐量。同时在设计 AQS 时充分考虑了可伸缩性,因此 J.U.C 中,所有基于 AQS 构建的同步器均可以获得这个优势。

同步状态

  • AQS 的主要使用方式是继承,子类通过继承同步器,并实现它的抽象方法来管理同步状态。

  • AQS 使用一个 int 类型的成员变量 state 来表示同步状态:

    • 当 state > 0 时,表示已经获取了锁。
    • 当 state = 0 时,表示释放了锁。
  • 它提供了三个方法,来对同步状态 state 进行操作,并且 AQS 可以确保对 state 的操作是安全的:
    • getState()
    • setState(int newState)
    • compareAndSetState(int expect, int update)

同步队列

  • AQS 通过内置的 FIFO 同步队列来完成资源获取线程的排队工作:

    • 如果当前线程获取同步状态失败(锁)时,AQS 则会将当前线程以及等待状态等信息构造成一个节点(Node)并将其加入同步队列,同时会阻塞当前线程
    • 当同步状态释放时,则会把节点中的线程唤醒,使其再次尝试获取同步状态。
  • 同步器的开始提到了其实现依赖于一个FIFO队列,该队列就是 CLH 同步队列,那么队列中的元素Node就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点。

  • Node 是 AbstractQueuedSynchronizer 的内部静态类,Node的主要包含以下成员变量:
属性名称描述
int waitStatus表示节点的状态。其中包含的状态有:
1. CANCELLED,值为1,由于在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待表示当前的线程被取消;
2. SIGNAL,值为-1,表示当前节点的后继节点包含的线程需要运行,将会通知后继节点,使后继节点的线程得以运行;
3. CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中;
4. PROPAGATE,值为-3,表示当前场景下后续的acquireShared能够得以执行;
5. 值为0,表示当前节点在sync队列中,等待着获取锁。
Node prev前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
Node next后继节点。
Node nextWaiter存储condition队列中的后继节点。
Thread thread入队列时的当前线程。
  • 入列

    • 学了数据结构的我们,CLH 队列入列是再简单不过了
      • tail 指向新节点。
      • 新节点的 prev 指向当前最后的节点。
      • 当前最后一个节点的 next 指向当前节点。
    • 过程图如下:

    • 实际上,入队逻辑实现的 #addWaiter(Node) 方法,需要考虑并发的情况。它通过 CAS 的方式,来保证正确的添加 Node 。代码如下:
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      47
      48
      49
      50
      51
      52
      53
      /**
      * Creates and enqueues node for current thread and given mode.
      *
      * mode 方法参数,传递获取同步状态的模式。
      * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared
      * @return the new node
      */
      private Node addWaiter(Node mode) {
      // 新建节点
      Node node = new Node(Thread.currentThread(), mode);
      // Try the fast path of enq; backup to full enq on failure
      // 记录原尾节点
      Node pred = tail;
      // 快速尝试,添加新节点为尾节点
      if (pred != null) {
      // 设置新 Node 节点的尾节点为原尾节点
      node.prev = pred;
      // CAS 设置新的尾节点
      if (compareAndSetTail(pred, node)) {
      // 成功,原尾节点的下一个节点为新节点
      pred.next = node;
      return node;
      }
      }
      // 失败,多次尝试,直到成功
      enq(node);
      return node;
      }


      /**
      * Inserts node into queue, initializing if necessary. See picture above.
      * @param node the node to insert
      * @return node's predecessor
      */
      private Node enq(final Node node) {
      // 死循环,多次尝试,直到成功添加为止
      for (;;) {
      // 记录原尾节点
      Node t = tail;
      // 原尾节点不存在,创建首尾节点都为 new Node()
      if (t == null) { // Must initialize
      if (compareAndSetHead(new Node()))
      tail = head;
      } else { // 原尾节点存在,添加新节点为尾节点
      node.prev = t;
      if (compareAndSetTail(t, node)) {
      t.next = node;
      return t;
      }
      }
      }
      }
  • 出队

    • CLH 同步队列遵循 FIFO,首节点的线程释放同步状态后,将会唤醒它的下一个节点(Node.next)。而后继节点将会在获取同步状态成功时,将自己设置为首节点( head )。
    • 这个过程非常简单,head 执行该节点并断开原首节点的 next 和当前节点的 prev 即可。注意,在这个过程是不需要使用 CAS 来保证的,因为只有一个线程,能够成功获取到同步状态。
    • 过程图如下:

    • setHead(Node node) 方法,实现上述的出列逻辑。代码如下:
      1
      2
      3
      4
      5
      private void setHead(Node node) {
      head = node;
      node.thread = null;
      node.prev = null;
      }

API说明

  • 实现自定义同步器时,需要使用同步器提供的getState()、setState()和compareAndSetState()方法来操纵状态的变迁。
方法名称描述
getState()返回同步状态的当前值。
setState(int newState)设置当前同步状态。
compareAndSetState(int expect, int update)使用 CAS 设置当前状态,该方法能够保证状态设置的原子性。
protected boolean tryAcquire(int arg)【可重写】排它的获取这个状态。这个方法的实现需要查询当前状态是否允许获取,然后再进行获取(使用compareAndSetState来做)状态。
protected boolean tryRelease(int arg)【可重写】释放状态。
protected int tryAcquireShared(int arg)【可重写】共享的模式下获取状态。
protected boolean tryReleaseShared(int arg)【可重写】共享的模式下释放状态。
protected boolean isHeldExclusively()【可重写】在排它模式下,状态是否被占用。
acquire(int arg)独占式获取同步状态。如果当前线程获取同步状态成功,则由该方法返回;否则,将会进入同步队列等待。该方法将会调用可重写的 #tryAcquire(int arg) 方法;
acquireInterruptibly(int arg)与 acquire(int arg) 相同,但是该方法响应中断。当前线程为获取到同步状态而进入到同步队列中,如果当前线程被中断,则该方法会抛出InterruptedException 异常并返回。
tryAcquireNanos(int arg, long nanos)超时获取同步状态。如果当前线程在 nanos 时间内没有获取到同步状态,那么将会返回 false ,已经获取则返回 true 。
acquireShared(int arg)共享式获取同步状态,如果当前线程未获取到同步状态,将会进入同步队列等待,与独占式的主要区别是在同一时刻可以有多个线程获取到同步状态;
acquireSharedInterruptibly(int arg)共享式获取同步状态,响应中断。
tryAcquireSharedNanos(int arg, long nanosTimeout)共享式获取同步状态,增加超时限制。
release(int arg)独占式释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒。
releaseShared(int arg)共享式释放同步状态。

同步状态的获取与释放

  • AQS 的设计模式采用的模板方法模式,子类通过继承的方式,实现它的抽象方法来管理同步状态。对于子类而言,它并没有太多的活要做,AQS 已经提供了大量的模板方法来实现同步,主要是分为三类:
    • 独占式获取和释放同步状态
    • 共享式获取和释放同步状态
    • 查询同步队列中的等待线程情况。

独占式: 同一时刻,仅有一个线程持有同步状态。

共享式: 共享式与独占式的最主要区别在于,同一时刻:1、独占式只能有一个线程获取同步状态。2、共享式可以有多个线程获取同步状态。

总结:

  • 等待队列是FIFO先进先出。
  • 加入同步队列后,并不是立即挂起,而是再次进行获取同步状态, 到挂起之前都是在自旋(无限循环尝试),因为同步状态的变化很快,线程上下文的切换比较耗时,所以用短暂的自旋来换取时间开销,当然如果一直自旋,那么开销反而大于了线程切换。所以把自旋时间(次数)控制在一定范围有利于提高性能。

参考