概述

  • HashMap是我们用得非常频繁的一个集合,但是由于它是非线程安全的,在多线程环境下,put操作是有可能产生死循环的,导致CPU利用率接近100%。为了解决该问题,提供了Hashtable和Collections.synchronizedMap(hashMap)两种解决方案,但是这两种方案都是对读写加锁,独占式,一个线程在读时其他线程必须等待,吞吐量较低,性能较为低下。故而Doug Lea大神给我们提供了高性能的线程安全HashMap:ConcurrentHashMap。

解析

ConcurrentHashMap的实现

  • ConcurrentHashMap作为Concurrent一族,其有着高效地并发操作,相比Hashtable的笨重,ConcurrentHashMap则更胜一筹了。
  • 在1.8版本以前,ConcurrentHashMap采用分段锁的概念,使锁更加细化,但是1.8已经改变了这种思路,而是利用CAS+Synchronized来保证并发更新的安全,当然底层采用数组+链表+红黑树的存储结构。
  • 我们先用一个示意图来描述下其结构:
  • 结构上和 Java8 的 HashMap 基本上一样,不过它要保证线程安全性,所以在源码上确实要复杂一些。

    初始化

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    // 这构造函数里,什么都不干
    public ConcurrentHashMap() {
    }
    public ConcurrentHashMap(int initialCapacity) {
    if (initialCapacity < 0)
    throw new IllegalArgumentException();
    int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
    MAXIMUM_CAPACITY :
    tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
    this.sizeCtl = cap;
    }
  • put 过程分析

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    public V put(K key, V value) {
    return putVal(key, value, false);
    }
    final V putVal(K key, V value, boolean onlyIfAbsent) {
    // key、value均不能为null
    if (key == null || value == null) throw new NullPointerException();
    // 得到 hash 值
    int hash = spread(key.hashCode());
    // 用于记录相应链表的长度
    int binCount = 0;
    for (Node<K,V>[] tab = table;;) {
    Node<K,V> f; int n, i, fh;
    // 如果数组"空",进行数组初始化
    if (tab == null || (n = tab.length) == 0)
    // 初始化数组,后面会详细介绍
    tab = initTable();

    // 找该 hash 值对应的数组下标,得到第一个节点 f
    else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
    // 如果数组该位置为空,
    // 用一次 CAS 操作将这个新值放入其中即可,这个 put 操作差不多就结束了,可以拉到最后面了
    // 如果 CAS 失败,那就是有并发操作,进到下一个循环就好了
    if (casTabAt(tab, i, null,
    new Node<K,V>(hash, key, value, null)))
    break; // no lock when adding to empty bin
    }
    // hash 居然可以等于 MOVED,这个需要到后面才能看明白,不过从名字上也能猜到,肯定是因为在扩容
    else if ((fh = f.hash) == MOVED)
    // 帮助数据迁移,这个等到看完数据迁移部分的介绍后,再理解这个就很简单了
    tab = helpTransfer(tab, f);

    else { // 到这里就是说,f 是该位置的头结点,而且不为空

    V oldVal = null;
    // 获取数组该位置的头结点的监视器锁
    synchronized (f) {
    if (tabAt(tab, i) == f) {
    if (fh >= 0) { // 头结点的 hash 值大于 0,说明是链表
    // 用于累加,记录链表的长度
    binCount = 1;
    // 遍历链表
    for (Node<K,V> e = f;; ++binCount) {
    K ek;
    // 如果发现了"相等"的 key,判断是否要进行值覆盖,然后也就可以 break 了
    if (e.hash == hash &&
    ((ek = e.key) == key ||
    (ek != null && key.equals(ek)))) {
    oldVal = e.val;
    if (!onlyIfAbsent)
    e.val = value;
    break;
    }
    // 到了链表的最末端,将这个新值放到链表的最后面
    Node<K,V> pred = e;
    if ((e = e.next) == null) {
    pred.next = new Node<K,V>(hash, key,
    value, null);
    break;
    }
    }
    }
    else if (f instanceof TreeBin) { // 红黑树
    Node<K,V> p;
    binCount = 2;
    // 调用红黑树的插值方法插入新节点
    if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
    value)) != null) {
    oldVal = p.val;
    if (!onlyIfAbsent)
    p.val = value;
    }
    }
    }
    }
    // binCount != 0 说明上面在做链表操作
    if (binCount != 0) {
    // 判断是否要将链表转换为红黑树,临界值和 HashMap 一样,也是 8
    if (binCount >= TREEIFY_THRESHOLD)
    // 这个方法和 HashMap 中稍微有一点点不同,那就是它不是一定会进行红黑树转换,
    // 如果当前数组的长度小于 64,那么会选择进行数组扩容,而不是转换为红黑树
    // 具体源码我们就不看了,扩容部分后面说
    treeifyBin(tab, i);
    if (oldVal != null)
    return oldVal;
    break;
    }
    }
    }
    //
    addCount(1L, binCount);
    return null;
    }

get 过程分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 判断头结点是否就是我们需要的节点
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 如果头结点的 hash 小于 0,说明 正在扩容,或者该位置是红黑树
else if (eh < 0)
// 参考 ForwardingNode.find(int h, Object k) 和 TreeBin.find(int h, Object k)
return (p = e.find(h, key)) != null ? p.val : null;

// 遍历链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
  • get 方法从来都是最简单的,这里也不例外:
    • 计算 hash 值
    • 根据 hash 值找到数组对应位置: (n – 1) & h
    • 根据该位置处结点性质进行相应查找

总结

其他

  • 何为同步容器?可以简单地理解为通过 synchronized来实现同步的容器,如果有多个线程调用同步容器的方法,它们将会串行执行。
    • 比如 Vector,Hashtable,以及 Collections#synchronizedSet(),Collections#synchronizedList() 等方法返回的容器。
    • 可以通过查看 Vector,Hashtable 等这些同步容器的实现代码,可以看到这些容器实现线程安全的方式就是将它们的状态封装起来,并在需要同步的方法上加上关键字 synchronized 。并发容器,使用了与同步容器完全不同的加锁策略来提供更高的并发性和伸缩性。
    • 例如在 ConcurrentHashMap 中采用了一种粒度更细的加锁机制,可以称为分段锁。在这种锁机制下,允许任意数量的读线程并发地访问 map ,并且执行读操作的线程和写操作的线程也可以并发的访问 map ,同时允许一定数量的写操作线程并发地修改 map ,所以它可以在并发环境下实现更高的吞吐量。
  • SynchronizedMap 和 ConcurrentHashMap 有什么区别?
    • SynchronizedMap
      • 一次锁住整张表来保证线程安全,所以每次只能有一个线程来访为 map 。
    • ConcurrentHashMap
      • 使用分段锁来保证在多线程下的性能。ConcurrentHashMap 中则是一次锁住一个桶。ConcurrentHashMap 默认将 hash 表分为 16 个桶,诸如 get,put,remove 等常用操作只锁当前需要用到的桶。这样,原来只能一个线程进入,现在却能同时有 16 个写线程执行,并发性能的提升是显而易见的。【注意,这块是 JDK7 的实现。在 JDK8 中,具体的实现已经改变】
      • 另外 ConcurrentHashMap 使用了一种不同的迭代方式。在这种迭代方式中,当 iterator 被创建后集合再发生改变就不再是抛出 ConcurrentModificationException 异常,取而代之的是在改变时 new 新的数据从而不影响原有的数据,iterator 完成后再将头指针替换为新的数据 ,这样 iterator 线程可以使用原来老的数据,而写线程也可以并发的完成改变。

        参考

  • http://www.iocoder.cn/JUC/sike/ConcurrentHashMap/
  • http://svip.iocoder.cn/Java/Concurrent/Interview/
  • http://www.importnew.com/28263.html