Java 并发:ConcurrentHashMap
ConcurrentHashMap(以下简称CHM) 对读取提供了完全的并发支持,对写入提供了高性能的并发支持。在读取数据时,CHM 无需上锁;而在写入数据时,也不用对整个 Map 上锁。
由于读取数据不上锁,读取时可能与写入发生重叠,CHM 只能反映最近完成的写入所产生的结果。与 volatile 类似,这也是符合 happens-before 原则的:如果通过 CHM 读取到某次写入后的值,那么写入前的程序的执行一定先于读取后的程序。
与 CopyOnWriteArrayList 相同,CHM 的迭代器在进行迭代时,不会因为写入或删除操作而报出 ConcurrentModificationException。但是,每次获得的迭代器只能被一个线程所使用,不应当传送给其他线程。一旦一个迭代器被创建,其状态就是确定的(一个快照),类似 size()
\ isEmpty()
\ containsValue()
等访问实时数据的函数就不能够用来控制程序了。
一、初始化#
CHM 中比较重要的成员变量如下:
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable {
// 最大容量,hashcode前两位用作控制,所以最大为1<<30
private static final int MAXIMUM_CAPACITY = 1 << 30;
// 默认容量
private static final int DEFAULT_CAPACITY = 16;
// 负载因子,需要扩容的键值对临界值和容量的比值,默认情况下用 n - (n >>> 2) 实现
private static final float LOAD_FACTOR = 0.75f;
// 链表转化为红黑树的临界值
static final int TREEIFY_THRESHOLD = 8;
// 红黑树转化为链表的临界值
static final int UNTREEIFY_THRESHOLD = 6;
// 启用红黑树转化的最小容量,小于这个值时,优先扩容
static final int MIN_TREEIFY_CAPACITY = 64;
// 标记状态
static final int MOVED = -1; // hash for forwarding nodes
static final int TREEBIN = -2; // hash for roots of trees
static final int RESERVED = -3; // hash for transient reservations
static final int HASH_BITS = 0x7fffffff; // usable bits of normal node hash
// 现有表
transient volatile Node<K,V>[] table;
// 更新后的表
private transient volatile Node<K,V>[] nextTable;
/* 用来控制表的初始化和扩容,具有以下几种状态
* -1 : 正在初始化
* < -1: 正在被-sizeCtl-1个线程扩容
* >= 0: 未初始化时,初始化的容量;初始化后,下一次扩容的容量。*/
private transient volatile int sizeCtl;
}
初始化时,若未指定初始容量,根据默认值创建;若指定了初始容量,将 sizeCtl 设置为下一次扩容的容量,即乘以 1.5 倍后向上取整(2 的幂)。
public ConcurrentHashMap() {
}
// 指定大小
public ConcurrentHashMap(int initialCapacity) {
if (initialCapacity < 0)
throw new IllegalArgumentException();
// 下一次扩容的容量
int cap = ((initialCapacity >= (MAXIMUM_CAPACITY >>> 1)) ?
MAXIMUM_CAPACITY :
tableSizeFor(initialCapacity + (initialCapacity >>> 1) + 1));
this.sizeCtl = cap;
}
// 向上取整(2的幂)的快速算法
private static final int tableSizeFor(int c) {
int n = c - 1;
n |= n >>> 1;
n |= n >>> 2;
n |= n >>> 4;
n |= n >>> 8;
n |= n >>> 16;
return (n < 0) ? 1 : (n >= MAXIMUM_CAPACITY) ? MAXIMUM_CAPACITY : n + 1;
}
不管是哪种初始化方式,都没有创建 table。这是一种懒加载策略,只有在第一次插入数据时,table 才会被创建。table 中存放着 Node 类保存的键值对,在设计时应保证其安全的发布。
static class Node<K,V> implements Map.Entry<K,V> {
final int hash; // 不可变
final K key; // 不可变
volatile V val; // 可修改
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
// 在其子类中将被重写,转化为红黑树后查找逻辑是不同的
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
二、写入#
为了尽可能减小锁的粒度,CHM 中引入了 Unsafe 提供的硬件支持的原子操作:
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
Node<K,V> c, Node<K,V> v) {
return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}
put()
调用了 putVal()
,传入 putVal()
的 onlyIfAbsent 表示是否只有在不存在相应 key 时才加入。可见 put()
始终传入 false,而另一个 public 方法 putIfAbsent()
则始终传入 true。putValue()
首先通过 (hashcode ^ (hashcode >>> 16)) & HASH_BITS
获取 hash 值。然后在一个循环中处理一下四种情况,直到跳出:
- table 未初始化:初始化 table;
- table 已经初始化,但是相应桶为空:直接创建 Node 加入其中,跳出;
- table 已经初始化,且相应桶不为空,但该桶正在迁移:当前线程将帮助迁移;
- table 已经初始化,且相应桶不为空,且该桶未在迁移,将键值对加入桶中:
- 桶中是链表,遍历这个链表:
- 若存在:直接修改 value,跳出;
- 若不存在:创建新 Node 加到尾部,跳出。
- 桶中是红黑树:通过 TreeNode 中
putTreeValue()
写入,逻辑与上面类似,跳出。
- 桶中是链表,遍历这个链表:
在完成上面步骤之后,还会检查 binCount,如果满足转化红黑树的条件则进行转化。
public V put(K key, V value) {
return putVal(key, value, false);
}
// onlyIfAbsent为true时,只能添加不能修改
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); // 取有效hash
int binCount = 0; // 计算桶内个数,用以控制扩容或树的转化
for (Node<K,V>[] tab = table;;) { // 循环,直到写入完成跳出
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); // 【未初始化则先初始化】
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // 桶为空,通过原子操作写入,不需要加锁
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 该节点正在迁移,该线程也帮助迁移
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) { // 上锁和检查头节点是否有变
if (fh >= 0) { // 大于等于0,则为链表
binCount = 1;
// 遍历同时记录binCount
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break; // 找到了,则修改
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null);
break; // 没找到,则添加在尾部
}
}
}
else if (f instanceof TreeBin) { // 处理红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i); // 【链表长度超过临界,转化为红黑树或者扩容】
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount); // 【计数,超过sizeCtl会进行扩容】
return null;
}
putVal()
中有三个函数是重点:initTable()
\ treeifyBin()
\ addCount
。 initTable()
必须要保证只有一个线程对 table 进行初始化。treeifyBin()
和 addCount
中都包含了扩容操作,但是扩容的结果却是不同的。为了便于区分,我们将 treeifyBin()
中使用 tryPresize()
进行的扩容称为 P 扩容,而将由 addCount()
进行的扩容称为 C 扩容。
2.1 initTable#
不管是初始化 table 还是对 table 进行扩容,sizeCtl 都是至关重要的一环。在初始化 table 时,并不能保证只有当前线程需要初始化,当有多个线程同时初始化 table 时,竞争就出现了。通过 sizeCtl 处理竞争很容易,若当前线程赢得了竞争,开始初始化,则把 sezeCtl 设置为 -1(这个操作必须原子性的)。其他线程在读到 sizeCtl = -1 时就放弃初始化,进而 yield()
让出资源等待初始化的完成,这就是为什么需要 while 循环的原因。但是这里还有两个问题:
- 为什么在检查 table 是否初始化时需要
(tab = table) == null || tab.length == 0
,似乎tab.length == 0
总是不能单独成立的? - 为什么最后的
sizeCtl = sc
需要 finally 句式来保证其一定执行。
第一个问题我还没有答案,不过第二问题有以下解释。在创建新的 table 时,可能会因为空间不足而报出 OutOfMemoryError
,该线程就无法执行下去,如果没有 finally 保证 sizeCtl = sc
一定执行,sizeCtl 将永远为 -1,而 table 也将永远无法创建。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0)
Thread.yield(); // lost initialization race; just spin
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
2.2 treeifyBin#
如果插入一个键值对后,链表长度大于等于 TREEIFY_THRESHOLD,就需要进行扩容或转化红黑树。treeifyBin()
是这一操作的入口,首先判断 table 容量是否小于 MIN_TREEIFY_CAPACITY,如果是则进行 P 扩容(由 tryPresize()
进行的,有别于下文中由 addCount()
进行的),否则进行红黑树转化。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) // 扩容
tryPresize(n << 1);
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { // 转化红黑树
synchronized (b) {
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) {
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
2.3 addCount#
addCount()
通过一个 CounterCell 数组计算键值对个数,虽然只是简单的计数,但为了尽可能减少线程冲突,其中蕴含了非常巧妙的设计。这部分内容将在第四章中介绍。
addCount()
中的 check 用来标识是否需要考虑扩容,putVal()
调用时, check 值实际上就是 putVal()
中的 binCount,有以下可能:
- 写入前桶内为空:0;
- 写入前桶内为链表:写入的节点位置;
- 写入前桶内为红黑树:2。
当 check 大于等于 0 且满足以下三个条件时,就需要进行 C 扩容:
- 键值对数达到sizeCtl;
- table非空;
- table长度非最大值。
实际上,C 扩容的步骤和 tryPresize()
中的一个分支完全一致,可以理解为 C 扩容是 P 扩容的一个子集,这部分将在下一章中介绍。
private final void addCount(long x, int check) {
// 计数部分,暂不考虑
CounterCell[] as; long b, s;
...
s = sumCount(); // 键值对数
// 判断是否C扩容
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
// 满足三个条件则C扩容:1.size达到sizeCtl;2.table非空;3.table长度非最大值
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
// 以下为C扩容,和tryPresize()中的一个分支完全一致
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
三、扩容#
CHM 的扩容非常有趣,正如上文所说,当前线程如果发现自己要处理的桶正在迁移,并不会干等着,而是利用自己的计算资源帮助完成整个 table 的迁移。
另一个有趣的点是,对于同样长度的 table,可能扩容出不同长度的新 table。来看下面一个例子:
// 测试例
public class ConcurrentMapTest {
public static void main(String[] args)
throws NoSuchFieldException, IllegalAccessException {
final int DEFAULT_CAPACITY = 16; // 默认容量
ConcurrentHashMap<Integer, Integer> map1 = new ConcurrentHashMap<>();
ConcurrentHashMap<Integer, Integer> map2 = new ConcurrentHashMap<>();
// c扩容
for (int i = 0; i < 12; i++) {
map1.put(i, i); // key不同
int length;
if ((length = getLength(map1)) != DEFAULT_CAPACITY) {
System.out.printf("第%d次插入后map1扩容:%d\n", i + 1, length);
break;
}
}
// p扩容
for (int i = 0; i < 12; i++) {
map2.put(i * 16, i); // key不同
int length;
if ((length = getLength(map2)) != DEFAULT_CAPACITY) {
System.out.printf("第%d次插入后map2扩容:%d\n", i + 1, length);
break;
}
}
}
// 通过反射获取table.length
private static int getLength(ConcurrentHashMap map)
throws NoSuchFieldException, IllegalAccessException {
Field tableField = ConcurrentHashMap.class.getDeclaredField("table");
tableField.setAccessible(true);
Object[] table = (Object[]) tableField.get(map);
return (table == null ? 0 : table.length);
}
}
第12次插入后map1扩容:32
第9次插入后map2扩容:128
上面两个 map 扩容后出现了不同的容量,这是因为两者触发了不同的扩容机制。map1 中每次插入的 key 为 1、2、3 等,由于 Integer 的 hashcode 等于其自身的开箱值,可以确保 map1 中的键值对是均匀分布的。因此,导致 map1 扩容的原因是键值对的数量达到sizeCtl(12)。map2 中每次插入的是 16 的倍数,所以每次插入的键值对都放进了第一个桶内,这导致第一个桶的链表越来越长,当长度大于 TREEIFY_THRESHOLD(8)时,触发了 treeifyBin()
,根据 treeifyBin()
的逻辑,map2 当前容量小于 MIN_TREEIFY_CAPACITY(64),优先通过 tryPresize()
进行 P 扩容。
3.1 P 扩容#
从上面的测试例可以看出,P 扩容正好是 C 扩容的四倍,这是巧合吗?上一章节中,我们已经对 addCount()
有了初步的认识,它会先计数,然后根据计数判断是否进行 C 扩容。在具体分析 C 扩容前,先来看看 tryPresize()
是如何实现 P 扩容的,也许能找到他们之间的一些联系。
首先 tryPresize()
会获得一个扩容要求 c,他是由传入 size 乘以 1.5 倍后向上舍入得到的。例如,初始 table.length=16,传入 size=32,可以得到 c=64。得到 c 后,通过 while 循环不断检验 sizeCtl 是否大于 c,有以下分支:
- table 未初始化:初始化 table;
- sizeCtl >= c:扩容完毕,跳出;
- 其他:进行 C 扩容。
现在我们知道,P 扩容实际上提供了 table 的初始化(在对空 map 进行 putAll()
时需要)和扩容的终止条件,其余时间都是在循环 C 扩容。上面的例子中,P 扩容后容量是 C 扩容的四倍,说明 P 扩容进行了三次 C 扩容。
- table.length=32,c=64 > sizeCtl=24;
- table.length=64,c=64 > sizeCtl=48;
- table.length=128,c=64 < sizeCtl=96,跳出。
总结,C 扩容将 table.length 翻倍,而 P 扩容进行若干次 C 扩容,直到满足条件。之所以 P 扩容比 C 扩容大得多,是因为触发 P 扩容时,map 已经察觉到明显的哈希冲突了。
// 注意: 如果是由treeifyBin()调用,传入的size已经是两倍的原table长度
private final void tryPresize(int size) {
int c = (size >= (MAXIMUM_CAPACITY >>> 1)) ? MAXIMUM_CAPACITY :
tableSizeFor(size + (size >>> 1) + 1); // 获得扩容要求c
int sc;
while ((sc = sizeCtl) >= 0) {
Node<K,V>[] tab = table; int n;
// 如果table没初始化,先初始化
if (tab == null || (n = tab.length) == 0) {
n = (sc > c) ? sc : c;
if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if (table == tab) {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = nt;
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
}
}
// 满足扩容要求,扩容完毕,跳出
else if (c <= sc || n >= MAXIMUM_CAPACITY)
break;
// 与C扩容一致
else if (tab == table) {
int rs = resizeStamp(n); // 获得扩容标志rs
// 有其他线程正在扩容
if (sc < 0) {
Node<K,V>[] nt;
// 出现以下情况不应该帮助迁移,见下文
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
// 原子操作将sizeCtl中的线程数加一
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
// 把table迁移到nextTable
transfer(tab, nt);
}
// 没有其他线程在扩容,通过原子操作尝试扩容,把sizeCtl设置为相应负数
// 原子操作成功,将(rs << RESIZE_STAMP_SHIFT) + 2)赋值给sizeCtl
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
// 创建一个两倍长度的新table,并赋值到nextTable上
// 开始迁移
transfer(tab, null);
}
}
}
3.2 C 扩容#
C 扩容的逻辑相对简单,但要理解它却比较困难,罪魁祸首就是我们老朋友 sizeCtl!
源码中的注解表示,当多个线程同时进行扩容时,sizeCtl=-(1+N),其中 N 表示正在扩容的线程数。如果按照这个逻辑,在扩容时,sizeClt 应当是一个比较接近 0 的负数, 但事实上 sizeCtl 通常都是一个非常小的负数。出现这种情况的原因是 sizeCtl 的高 16 位被用来标识 table 了,其低 16 位才标识 1+N。先来看扩容标识 rs 是怎么获得的。
static final int resizeStamp(int n) {
// 取前导0的个数,并把低16位的第一位置1 RESIZE_STAMP_BITS默认16
return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
}
为了方便理解,还是以初始长度为 16 的 table 为例,前导零一共 27 个,可得:
rs = 0000 0000 0000 0000 1000 0000 0010 0111
(置1) (27)
有了扩容标识 rs 后,判断 sizeCtl,如果 sizeCtl 为负,说明有其他线程正在扩容;如果 sizeCtl 非负,则通过原子操作将 sizeCtl 设置为相应负数,原子操作成功就开始扩容。
U.compareAndSwapInt(this, SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2);
// 原子操作成功,则siztCtl = 1000 0000 0010 0111 0000 0000 0000 0010
原子操作成功后,sizeCtl 就变成了一个很小的负数,事实上,我们不应该关注这个值本身的大小,只需要关心其高 16 位和低 16 位的真正含义就可以了。sizeCtl 修改完毕后,transfer(tab, null)
将先创建一个两倍长度的 nextTable,然后开始将 table 上的数据迁移到 nextTable 上。
这时候其他线程就会觉察到正在迁移(sizeCtl<0)并试图帮助迁移。首先判断是否有资格帮助迁移,以下情况将不予参与扩容并直接跳出:
- (sc »> RESIZE_STAMP_SHIFT) != rs:扩容标识对不上,说明不是同一 table,帮不上忙;
- sc == rs + 1:扩容已经结束,收工了;
- sc == rs + MAX_RESIZERS:已经达到最大扩容线程数,人手够了;
- (nt = nextTable) == null:nextTab还没创建,工厂还没建完;
- transferIndex <= 0:所有桶都已经分配出去,来晚了一步。
如果有资格参与扩容,就进行一次原子操作对 sizeCtl 加一,成功就可以 transfer(tab, nt)
,将 table 中的数据迁移到 nextTable。
3.3 transfer#
终于,关于扩容只剩下最后一个问题需要搞清楚——table 和 nextTable 之间的数据是如何迁移的。
transfer()
非常长,包含了很多逻辑,仔细拆分后能更好理解。我们首先列出总体框架,然后对细节进行推敲。
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 1.根据CPU数确定划分table的步长
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
// 2.若传入的nextTab为null,则创建nextTable
if (nextTab == null) { // initiating
try {
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1];
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n; // 用于控制扩容区间的分配
}
int nextn = nextTab.length;
// 用于标记已处理的空桶
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
// 3.循环处理,i表示当前处理的桶下标(原table),bound表示分得区间的下界
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 领取区间 / 控制桶的序号 i--
while (advance) {
... // 详见3.3.1节
}
// 判断是否结束
if (i < 0 || i >= n || i + n >= nextn) {
... // 详见3.3.2节
}
// 当前桶为空,插入一个 fwd
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// 当前桶已经迁移
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
// 迁移桶内数据
else {
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// 处理节点是链表的情况
if (fh >= 0) {
... // 详见3.3.3节
}
// 处理节点是红黑树的情况
else if (f instanceof TreeBin) {
... // 详见3.3.3节
}
}
}
}
}
}
我们先将 transfer()
拆分为以下几个部分:
- 根据 CPU 数确定划分 table 的步长:
- 如果是单核 CPU,步长就是 table.length;
- 如果是多核 CPU,步长为
max((n >>> 3) / NCPU, MIN_TRANSFER_STRIDE)
。
- 若传入的 nextTab 为 null,则创建 nextTable:
- 将创建的 nextTab 赋给全局变量 nextTable;
- 将用于控制扩容区间的分配 transferIndex 初始化为 table.length。
- 开始循环处理区间内各个桶得迁移,由 advance 控制程序:
- while 循环对区间 [bound, i] 进行分配或调整(–i);
- 开始下面四个分支:
- i 越界:判断当前区间的迁移是否结束;
- 当前桶为空:插入一个 fwd;
- 当前桶已经迁移:跳过;
- 其他:迁移桶内数据,包括链表和红黑树。
首先,根据 CPU 的核心数来划分迁移的步长,步长就是每个线程需要处理的桶个数。如果是单核,由于没有并行能力,线程直接处理整个 table 即可;如果是多线程,通过 (n >>> 3) / NCPU
计算步长,但是必须要大于 MIN_TRANSFER_STRIDE,默认 16。
接着,如果传入的 nextTab 为 null,就需要创建 nextTable 并赋值给全局变量。同时,将全局变量 transferIndex 设为 table.length,这个变量告诉所有线程迁移进行到了哪个分区。
最后开始一个循环,每次循环处理一个桶,先对区间进行控制,然后由四个分支处理。这四个分支中,当前桶为空和当前桶已迁移比较简单。
当前桶为空的情况,会插入一个 ForwardingNode 类的 fwd,它的状态码为 MOVED。当其他线程进行对该桶 putVal()
时,会调用 helpTransfer()
帮助迁移;其他线程对该桶迁移时,则会直接跳过。最后把 advance 设置为插入 fwd 的原子操作的返回值,如果成功了下一个循环将迁移下一个桶或结束,如果失败了下一个循环还会继续处理当前桶。
当前桶已迁移的情况,直接跳过,将 advance 设置为 true。
剩下还有三个部分需要继续分析:控制区间、结束条件和迁移数据。
3.3.1 控制区间
在 for (int i = 0, bound = 0;;)
中初始化了两个变量 bound 和 i,bound 表示区间下界,i 表示当前处理桶的下标。
while (advance) {
int nextIndex, nextBound;
// 选择下一个桶或处理结束信号
if (--i >= bound || finishing)
advance = false;
// 没有新的区间可以分配
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 分配区间
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
第一次循环前两个分支都不成立,将由分支三分配区间。假设原 table 的长度为 64,步长 strde 为 16,控制区间的过程大致如下:
- 初始状态下,transferIndex=64;
- 分配区间,通过原子操作将 transferIndex 减 stride,将修改后的 transferIndex 赋值给 bound,将修改前的 transferIndex 赋值给 i;
- 对 i 自减 1,不断处理每一个桶;
- 当 i 小于 bound 时,触发下一次分配。
1. |————————|————————|————————|————————|
0 transferIndex=64
2. |————————|————————|————————|————————|
0 transferIndex=48
bound=48
i=63
3. |————————|————————|————————|————————|
0 transferIndex=48
bound=48
i=62
|————————|————————|————————|————————|
0 transferIndex=48
bound=48
i=47
4. |————————|————————|————————|————————|
0 transferIndex=32
bound=32
i=47
3.3.2 结束条件
判断结束条件围绕 finishing 变量展开,当 i 小于 0 时,当前线程已经完成了 table 最后的一个区间,对 finishing 进行判断:
- finishing 为 true:
- 全局变量 nextTable 置空;
- 替换 table;
- 设置 sizeCtl。
- finishing 为 false:
- 原子操作对 sizeCtl 中的线程数减 1;
- 判断是否所有线程都结束工作,否则返回,是则继续;
- finishing 置 true。
- i 置 taable.length,下一个循环开始全局检查。
// i<0时,已经完成扩容,暂不知道(i>=n || i+n>=nextn)的含义
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
// finishing为true则结束
if (finishing) {
nextTable = null; // 置空
table = nextTab; // 替换
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl设为0.75倍的table.length
return;
}
// finishing为false,但是该线程的任务已经完成,sizeCtl中的线程数减1
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
// 判断sizeCtl是否回到刚开始扩容的状态
// 是则说明所有用于迁移的线程都结束工作,否则直接返回
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
// finishing置为true
finishing = advance = true;
// i置为table.length,全部扫描一遍,确定所有桶都已经完成迁移
i = n; // recheck before commit
}
}
3.3.3 迁移链表
迁移的逻辑和 HashMap 类似,对于节点 node,将其 hash 码和 table.length 进行位与,由于 table.length 总是 2 的幂,位与的结果不是 0 就是 table.length。将得到 0 的 node 放在低位桶,其余放在高位桶。
// 上锁,与putVal()同步,f为当前桶的头节点
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// hash>0为链表,树节点的hash=-2
if (fh >= 0) {
// 决定移动到低位桶还是高位桶
int runBit = fh & n;
Node<K,V> lastRun = f;
// 找到最后将迁移到同一个桶的所有节点,
// 这部分不需要创建新的节点,而是直接迁移
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
// 根据每个节点的hash值迁移,由于节点中的key是不可变的,需要创建新的节点
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
// 头插法,创建新节点时把自己作为next节点传入
// 最后链表的顺序将会颠倒(除了lastRun之后的)
ln = new Node<K,V>(ph, pk, pv, ln);
else
// 同上
hn = new Node<K,V>(ph, pk, pv, hn);
}
setTabAt(nextTab, i, ln); // 原子操作把低位桶置入新表
setTabAt(nextTab, i + n, hn); // 原子操作把高位桶置入新表
setTabAt(tab, i, fwd); // 原子操作把fwd置入旧表表示已经迁移
advance = true;
}
else if (f instanceof TreeBin) {
...
}
}
}
我们以长度为 8 的 table 为例,在 table 的第一个桶内插入了 9 条数据,这时候触发了扩容。用 [key] 表示这些 key 的值,并求得每一个节点的 runBit。对这些节点遍历,找到 lastRun 的位置。
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | table
[0] : 0000 0000 & 1000 = 0
[8] : 0000 1000 & 1000 = 8
[16]: 0001 0000 & 1000 = 0
[24]: 0001 1000 & 1000 = 8
[32]: 0010 0000 & 1000 = 0
[40]: 0010 1000 & 1000 = 8
[48]: 0011 0000 & 1000 = 0 <-lastRun
[64]: 0100 0000 & 1000 = 0
[80]: 0101 0000 & 1000 = 0
[key] runBit
再次遍历所有节点直到 lastRun 为止,不断把节点添加到 ln 和 hn 上。由于遍历顺序和添加节点的顺序相反,最后链表的顺序将会颠倒(除了 lastRun 之后的)。
ln hn
| 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | newTable
[32] - [40] -
[16] - [24] -
[0] <- reversed [8] <- reversed
[48] <- lastRun
[64]
[80]
3.3.4 迁移红黑树
红黑树的迁移与链表类似,只是处理数据结构不同。不过需要注意红黑树拆分后可能就不足 UNTREEIFY_THRESHOLD,默认 6,这时候需要转化为链表。
synchronized (f) {
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
if (fh >= 0) {
...
}
// 节点为红黑树
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
// 遍历
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
// 和链表相同的判断,与运算==0的放在低位
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
} // 不是0的放在高位
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 如果树的节点数小于等于 6,那么转成链表,反之,创建一个新的树
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
// 低位树
setTabAt(nextTab, i, ln);
// 高位数
setTabAt(nextTab, i + n, hn);
// 旧的设置成占位符
setTabAt(tab, i, fwd);
// 继续向后推进
advance = true;
}
}
}
四、计数#
通过 size()
可以获得当前键值对的数量,它将 sumCount()
获得的 long 类型的值转化为 int 返回。
sumCount()
则计算 baseCount 字段与 counterCells 数组中所有非空元素的记录值的和。
// 未发生争用前都用它计数
private transient volatile long baseCount;
// 用于同步counterCells数组结构修改的乐观锁资源
private transient volatile int cellsBusy;
// 支持多个线程同时通过counterCells中的多个元素计数
private transient volatile CounterCell[] counterCells;
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// 计算 baseCount 字段与所有 counterCells 数组的非空元素的和
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount;
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
由此我们可以推断,CHM 将键值对数量保存在全局变量 baseCount 和 CounterCell 数组 counterCells 中。CounterCell 是 CHM 的静态内部类,结构很简单,仅仅保存一个 volatile 修饰的 long 值。因此,每一个 CounterCell 对象占用空间都很小,当一个 CounterCell 数组创建时,其中的多个元素可能处在同一缓存行内,这会导致一种伪共享的现象,极大降低并发效率。
@sun.misc.Contended
的作用就是保证修饰对象不会出现伪共享,原理是在对象或字段的前后各增加 128 字节大小的 padding,使用 2 倍于大多数硬件缓存行的大小来避免相邻扇区预取导致的伪共享冲突。
@sun.misc.Contended static final class CounterCell {
volatile long value;
CounterCell(long x) { value = x; }
}
那么键值对的数量是如何保存到 baseCount 和 counterCells 中的呢?
在第二章中,我们介绍了 addCount()
,它会先对 CHM 中所有键值对计数,然后考虑是否扩容。现在我们来看看它是如何计数的。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 第一层if
if ((as = counterCells) != null ||
!U.compareAndSwapLong(this, BASECOUNT, b = baseCount, s = b + x)) {
CounterCell a; long v; int m;
boolean uncontended = true;
// 第二层if
if (as == null || (m = as.length - 1) < 0 ||
(a = as[ThreadLocalRandom.getProbe() & m]) == null ||
!(uncontended =
U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) {
fullAddCount(x, uncontended);
return;
}
if (check <= 1)
return;
s = sumCount();
}
if (check >= 0) {
...
}
}
addCount()
中有两层 if,其中蕴含了非常巧妙的设计,第一层:
- counterCells 不为空:跳转第二层 if;
- counterCells 为空:不着急创建 counterCells,先用原子操作增加 baseCount:
- 成功:结束计数;
- 失败:跳转第二层 if。
第二层 if 就是用来创建 counterCells 以及使用它来计数的:
- counterCells 为空或 counterCells[probe] 为空:调用
fullAddCount(x, true)
; - counterCells[probe] 已经创建:原子操作增加其 value 值:
- 成功:结束计数;
- 失败:调用
fullAddCount(x, false)
。
probe 可以理解为每个线程特有的哈希值(不同在于 probe 是可变的)与 counterCells.length - 1 的位与,目的是为了让每个线程都拥有属于自己的 counterCell,这样每个线程增加计数时就不会线程冲突了。由此可见 counterCells 本身就是一个小的哈希表,当 counterCells 为空或 counterCells[probe] 为空时,就需要 fullAddCount(x, uncontended)
来创建,此时 uncontended 为 true。
不幸的是,既然是哈希表,就存在哈希冲突。如果原子操作争用 counterCells[probe](线程冲突),就需要 fullAddCount(x, uncontended)
解决哈希冲突和线程冲突,此时 uncontended 为 false。也就是说,这里的冲突包含两种含义:线程的哈希冲突和线程的竞争冲突。
4.1 fullAddCount#
fullAddCount()
可真是全能,来看看它是如何实现的。依旧,先理清框架,再探究细节。
private final void fullAddCount(long x, boolean wasUncontended) {
int h;
// 判断线程哈希值是否初始化
if ((h = ThreadLocalRandom.getProbe()) == 0) {
ThreadLocalRandom.localInit(); // force initialization
h = ThreadLocalRandom.getProbe();
wasUncontended = true; // 重新假设未发生争用
}
boolean collide = false; // 是否要给counterCells扩容的标志
for (;;) {
CounterCell[] as; CounterCell a; int n; long v;
// 1.counterCells非空且长度大于0,用它来计数
if ((as = counterCells) != null && (n = as.length) > 0) {
... // 详见4.2节
}
// 2.counterCells为空或长度为0,初始化counterCells
else if (cellsBusy == 0 && counterCells == as &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
// 获取cellsBusy锁
boolean init = false;
try { // Initialize table
if (counterCells == as) {
CounterCell[] rs = new CounterCell[2]; // 初始长度为2
rs[h & 1] = new CounterCell(x);
counterCells = rs;
init = true;
}
} finally {
cellsBusy = 0;
}
if (init)
break;
}
// 3.counterCells为空或长度为0,并且获取cellsBusy锁失败
// 则会再次尝试将x累加到baseCount
else if (U.compareAndSwapLong(this, BASECOUNT, v = baseCount, v + x))
break; // Fall back on using base
}
}
首先判断线程哈希值是否初始化,若未初始化,则通过 ThreadLocalRandom.localInit()
强制初始化。
魔术 0x9e3779b9 是黄金分割比与 2^32 的乘积,使用它能够有效分散哈希分布。
private static final int PROBE_INCREMENT = 0x9e3779b9; // 魔数
// 竞争不激烈,可用原子类
private static final AtomicInteger probeGenerator = new AtomicInteger();
static final void localInit() {
int p = probeGenerator.addAndGet(PROBE_INCREMENT); // 每次累加魔数
int probe = (p == 0) ? 1 : p; // skip 0
long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT));
Thread t = Thread.currentThread();
UNSAFE.putLong(t, SEED, seed);
UNSAFE.putInt(t, PROBE, probe); // 写回
}
初始化后,将 wasUncontended 置为 true,由于 probe 改变了,可以假设未发生争用。
接着是一个复杂的循环,但是总体框架是清晰的:
- counterCells 非空且长度大于 0:用它来计数;
- counterCells 为空或长度为 0,初始化 counterCells;
- counterCells 为空或长度为 0,并且获取 cellsBusy 锁失败:再次尝试将 x 累加到 baseCount。
第二个和第三个分支比较好理解,关键在于第一个分支如何用 counterCells 计数,又是如何处理 counterCells 哈希冲突的。
4.2 counterCells#
循环:
- 若 counterCells[probe] 为空,插入新的 CounterCell 对象:
- 创建成功:已记录 x,跳出;
- 创建失败:进入下一个循环。
- 若 wasUncontended 为 false,存在竞争:
- wasUncontended 置为 true;
- 执行线程 rehash,进入下一个循环。
- 若成功通过 CAS 将 x 累加进 counterCells[probe]:跳出。
- 若 counterCells 正在扩容或长度大于等于处理器数:
- collide 置为 false,无法扩容;
- 执行线程 rehash,进入下一个循环。
- 若 collide 为 false:
- collide 置为 true,下一轮第三步继续失败将进行扩容;
- 执行线程 rehash,进入下一个循环。
- 若 collide 为 true:
- 尝试给 counterCells 扩容;
- 进入下一个循环。
以上就是 counterCells 计数的流程,有几个点需要说明:
- 就算进行了线程 rehash 也没法保证一定不会哈希冲突,但是如果传入值 wasUncontended 明确表示存在哈希冲突和线程冲突并存,第二步还是会主动线程 rehash。
- 第一步和第二步都只会执行一次,之后的每一次循环都会尝试第三步通过 CAS 将 x 累加进 counterCells[probe]。
- 并不是第三步失败,就立即扩容,两者之间还是会多次尝试第三步 CAS。
- 如果不能扩容,就会在第三步和第四步间来回执行,直到线程 rehash 后第三步 CAS 成功为止。
if ((as = counterCells) != null && (n = as.length) > 0) {
// 1.counterCells[probe]为空,插入新的CounterCell
if ((a = as[(n - 1) & h]) == null) {
if (cellsBusy == 0) { // Try to attach new Cell
// 乐观地提前创建了value=x的CounterCell
CounterCell r = new CounterCell(x); // Optimistic create
if (cellsBusy == 0 && // CAS获取cellsBusy锁
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
boolean created = false;
try { // Recheck under lock
CounterCell[] rs; int m, j;
// 判断有没有被其它线程初始化
if ((rs = counterCells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
cellsBusy = 0; // 释放cellsBusy锁,赋值本身具有原子性
}
if (created) // 初始化元素成功,直接退出循环
break;
continue; // Slot is now non-empty
}
}
collide = false;
}
// 2.wasUncontended为false,存在竞争,线程rehash
else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash(线程哈希)
// 3.尝试将x累加进数组元素
else if (U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))
break;
// 4.判断counterCells是否正在扩容,或数组长度是否大于等于处理器数
else if (counterCells != as || n >= NCPU)
collide = false; // At max size or stale
// 5.如果数组没有在扩容,且数组长度小于处理器数
// 此时,如果collide为false,则把它变成true
// 在下一轮循环中,如果CAS累加value继续失败,就会触发counterCells扩容
else if (!collide)
collide = true;
// 6.如果collide为true,则尝试给counterCells数组扩容
else if (cellsBusy == 0 &&
U.compareAndSwapInt(this, CELLSBUSY, 0, 1)) {
try {
if (counterCells == as) {// Expand table unless stale
CounterCell[] rs = new CounterCell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
counterCells = rs;
}
} finally {
cellsBusy = 0;
}
collide = false;
continue; // Retry with expanded table
}
// 更改当前线程的哈希值,未continue得分支都会执行
h = ThreadLocalRandom.advanceProbe(h);
}
五、读取#
既然采用了 COW 的思想,读取总是比写入简单得多。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) {
// 首节点即匹配
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// 该桶已经被迁移,则交由节点find()方法查找
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// 搜索桶内所有节点
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
当搜索的桶已经迁移时,会得到一个 ForwardingNode 类节点,通过 find()
就可以追踪到迁移后的桶(nextTable[(n - 1) & h],n=nextTable.length,h=key.hash),然后在该桶内查找。
Node<K,V> find(int h, Object k) {
// loop to avoid arbitrarily deep recursion on forwarding nodes
outer: for (Node<K,V>[] tab = nextTable;;) {
Node<K,V> e; int n;
if (k == null || tab == null || (n = tab.length) == 0 ||
(e = tabAt(tab, (n - 1) & h)) == null)
return null;
for (;;) {
int eh; K ek;
if ((eh = e.hash) == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
if (eh < 0) {
if (e instanceof ForwardingNode) {
tab = ((ForwardingNode<K,V>)e).nextTable;
continue outer;
}
else
return e.find(h, k);
}
if ((e = e.next) == null)
return null;
}
}
}