简介

Condition

  • 在没有 Lock 之前,我们使用 synchronized 来控制同步,配合 Object 的 #wait()、#notify() 等一系列方法可以实现等待 / 通知模式。在 Java SE 5 后,Java 提供了 Lock 接口,相对于 synchronized 而言,Lock 提供了条件 Condition ,对线程的等待、唤醒操作更加详细和灵活。下图是 Condition 与 Object 的监视器方法的对比(摘自《Java并发编程的艺术》):
  • java.util.concurrent.locks.Condition 条件 Condition 接口,定义了一系列的方法,来对阻塞和唤醒线程:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    // ========== 阻塞 ==========
    void await() throws InterruptedException; // 造成当前线程在接到信号或被中断之前一直处于等待状态。
    void awaitUninterruptibly(); // 造成当前线程在接到信号之前一直处于等待状态。【注意:该方法对中断不敏感】。
    long awaitNanos(long nanosTimeout) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。返回值表示剩余时间,如果在`nanosTimeout` 之前唤醒,那么返回值 `= nanosTimeout - 消耗时间` ,如果返回值 `<= 0` ,则可以认定它已经超时了。
    boolean await(long time, TimeUnit unit) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
    boolean awaitUntil(Date deadline) throws InterruptedException; // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。如果没有到指定时间就被通知,则返回 true ,否则表示到了指定时间,返回返回 false 。

    // ========== 唤醒 ==========
    void signal(); // 唤醒一个等待线程。该线程从等待方法返回前必须获得与Condition相关的锁。
    void signalAll(); // 唤醒所有等待线程。能够从等待方法返回的线程必须获得与Condition相关的锁。
  • Condition是个接口,基本的方法就是await()和signal()方法;

  • Condition依赖于Lock接口,生成一个Condition的基本代码是lock.newCondition()
  • 调用Condition的await()和signal()方法,都必须在lock保护之内,就是说必须在lock.lock()和lock.unlock之间才可以使用
    • Conditon中的await()对应Object的wait();
    • Condition中的signal()对应Object的notify();
    • Condition中的signalAll()对应Object的notifyAll()。
  • 示例代码

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    public class ConditionTest {
    final Lock lock = new ReentrantLock();
    // 获取的是ConditionObject
    final Condition condition = lock.newCondition();

    public static void main(String[] args) {
    ConditionTest conditionTest = new ConditionTest();
    Consumer consumer = conditionTest.new Consumer();
    Producer producer = conditionTest.new Producer();

    consumer.start();
    producer.start();

    }

    class Consumer extends Thread {
    @Override
    public void run() {
    try {
    lock.lock();
    System.out.println("消费者:我在等一个新信号" + currentThread().getName());
    condition.await();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } finally {
    System.out.println("消费者:拿到一个信号" + currentThread().getName());
    lock.unlock();
    }
    }
    }
    class Producer extends Thread {
    @Override
    public void run() {
    try {
    lock.lock();
    System.out.println("生产者:我拿到了锁" + currentThread().getName());
    condition.signalAll();
    System.out.println("生产者:我发出了一个信号" + currentThread().getName());
    } finally {

    lock.unlock();
    }
    }
    }
    }
  • 运行结果

    1
    2
    3
    4
    5
    6
    7
    "C:\Program Files\Java\jdk1.7.0_80\bin\java" "-javaagent:C:\Program Files\JetBrains\IntelliJ IDEA 2017.2.1\lib\idea_rt.jar=54974:C:\Program Files\JetBrains\IntelliJ IDEA 2017.2.1\bin" -Dfile.encoding=UTF-8 -classpath "C:\Program Files\Java\jdk1.7.0_80\jre\lib\charsets.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\deploy.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\access-bridge-64.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\dnsns.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\jaccess.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\localedata.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\sunec.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\sunjce_provider.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\sunmscapi.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\ext\zipfs.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\javaws.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\jce.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\jfr.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\jfxrt.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\jsse.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\management-agent.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\plugin.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\resources.jar;C:\Program Files\Java\jdk1.7.0_80\jre\lib\rt.jar;D:\workspace-github\jvm\target\production\jvm" com.songsy.jdk.concurrent.ConditionTest
    消费者:我在等一个新信号Thread-0
    生产者:我拿到了锁Thread-1
    生产者:我发出了一个信号Thread-1
    消费者:拿到一个信号Thread-0

    Process finished with exit code 0
  • Condition的执行方式,是当在线程Consumer中调用await方法后,线程Consumer将释放锁,并且将自己沉睡,等待唤醒,线程Producer获取到锁后,开始做事,完毕后,调用Condition的signalall方法,唤醒线程Consumer,线程Consumer恢复执行。

  • 以上说明Condition是一个多线程间协调通信的工具类,使得某个,或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。

ConditionObject

  • 获取一个 Condition 必须要通过 Lock 的 #newCondition() 方法。该方法定义在接口 Lock 下面,返回的结果是绑定到此 Lock 实例的新 Condition 实例。Condition 为一个接口,其下仅有一个实现类 ConditionObject ,由于 Condition 的操作需要获取相关的锁,而 AQS则是同步锁的实现基础,所以 ConditionObject 则定义为 AQS 的内部类。代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class ConditionObject implements Condition, java.io.Serializable {
/**
* 我们知道一个Condition可以在多个地方被await(),那么就需要一个FIFO的结构将这些Condition串联起来,
* 然后根据需要唤醒一个或者多个(通常是所有)。所以在Condition内部就需要一个FIFO的队列。
*/
/** First node of condition queue. */
private transient Node firstWaiter; // 头节点
/** Last node of condition queue. */
private transient Node lastWaiter; // 尾节点

public ConditionObject() {
}

// ... 省略内部代码
}
  • 从代码中可以看出ConditionObject 拥有首节点(firstWaiter),尾节点(lastWaiter)。当前线程调用 #await()方法时,将会以当前线程构造成一个节点(Node),并将节点加入到该队列的尾部
  • Node 里面包含了当前线程的引用。Node 定义与 AQS 的 CLH 同步队列的节点使用的都是同一个类(AbstractQueuedSynchronized 的 Node 静态内部类)。

大体实现流程

  • AQS等待队列与Condition队列是两个相互独立的队列
    • await()就是在当前线程持有锁的基础上释放锁资源,并新建Condition节点加入到Condition的队列尾部,阻塞当前线程
    • signal()就是将Condition的头节点移动到AQS等待节点尾部,让其等待再次获取锁
  • 以下是AQS队列和Condition队列的出入结点的示意图,可以通过这几张图看出线程结点在两个队列中的出入关系和条件。
  • 一:AQS等待队列有3个Node,Condition队列有1个Node(也有可能1个都没有),节点1执行Condition.await()
    • 1.将head后移
    • 2.释放节点1的锁并从AQS等待队列中移除
    • 3.将节点1加入到Condition的等待队列中
    • 4.更新lastWaiter为节点1

  • 二:节点2执行signal()操作
    • 5.将firstWaiter后移
    • 6.将节点4移出Condition队列
    • 7.将节点4加入到AQS的等待队列中去
    • 8.更新AQS的等待队列的tail
  • await

    • 调用 Condition 的 #await() 方法,会使当前线程进入等待状态,同时会加入到 Condition 等待队列,并且同时释放锁。当从 #await() 方法结束时,当前线程一定是获取了Condition 相关联的锁。
  • signal

    • 调用 ConditionObject的 #signal() 方法,将会唤醒在等待队列中等待最长时间的节点(条件队列里的首节点),在唤醒节点前,会将节点移到CLH同步队列中。

示例

  • 生产者和消费者
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    public class ConsumerProducer {
    private int storage;
    private int putCounter;
    private int getCounter;
    private Lock lock = new ReentrantLock();
    private Condition putCondition = lock.newCondition();
    private Condition getCondition = lock.newCondition();

    public void put() throws InterruptedException {
    try {
    lock.lock();
    if (storage > 0) {
    putCondition.await();
    }
    storage++;
    System.out.println("put => " + ++putCounter );
    getCondition.signal();
    } finally {
    lock.unlock();
    }
    }

    public void get() throws InterruptedException {
    try {
    lock.lock();
    lock.lock();
    if (storage <= 0) {
    getCondition.await();
    }
    storage--;
    System.out.println("get => " + ++getCounter);
    putCondition.signal();
    } finally {
    lock.unlock();
    lock.unlock();
    }
    }

    public class PutThread extends Thread {
    @Override
    public void run() {
    for (int i = 0; i < 100; i++) {
    try {
    put();
    } catch (InterruptedException e) {
    }
    }
    }
    }

    public class GetThread extends Thread {
    @Override
    public void run() {
    for (int i = 0; i < 100; i++) {
    try {
    get();
    } catch (InterruptedException e) {
    }
    }
    }
    }

    public static void main(String[] args) {
    final ConsumerProducer test = new ConsumerProducer();
    Thread put = test.new PutThread();
    Thread get = test.new GetThread();
    put.start();
    get.start();
    }
    }

总结:

  • 一个线程获取锁后,通过调用 Condition 的 #await() 方法,会将当前线程先加入到条件队列中,然后释放锁,最后通过 #isOnSyncQueue(Node node) 方法,不断自检看节点是否已经在 CLH 同步队列了,如果是则尝试获取锁,否则一直挂起。
  • 当线程调用 #signal() 方法后,程序首先检查当前线程是否获取了锁,然后通过#doSignal(Node first) 方法唤醒CLH同步队列的首节点。被唤醒的线程,将从 #await() 方法中的 while 循环中退出来,然后调用 #acquireQueued(Node node, int arg) 方法竞争同步状态。

参考