在开发中,我们经常使用 HashMap 容器来存储 K-V 键值对,但是在并发多线程的情况下,HashMap 容器又是不安全的,因为在 put 元素的时候,如果触发扩容操作,也就是 rehash ,就会将原数组的内容重新 hash 到新的扩容数组中,但是在扩容这个过程中,其他线程也在进行 put 操作,如果这两个元素 hash 值相同,可能出现同时在同一数组下用链表表示,造成闭环,导致在get时会出现死循环,所以HashMap是线程不安全的 。
那有没有安全的 Map 容器呢?有的,目前 JDK 中提供了三种安全的 Map 容器:
HashTable
Collections.SynchronizedMap(同步包装器提供的方法)
ConcurrentHashMap
先来看看前两种容器,它们都是利用非常粗粒度的同步方式,在高并发情况下,性能比较低下。Hashtable 是在 put、get、size 等各种方法加上“synchronized” 锁来保证安全,这就导致了所有并发操作都要竞争同一把锁,一个线程在进行同步操作时,其他线程只能等待,大大降低了并发操作的效率。
再来看看 Collections 提供的同步包装器 SynchronizedMap ,我们可以先来看看 SynchronizedMap 的源代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private static class SynchronizedMap <K ,V > implements Map <K ,V >, Serializable { private static final long serialVersionUID = 1978198479659022715L ; private final Map<K,V> m; final Object mutex; SynchronizedMap(Map<K,V> m) { this .m = Objects.requireNonNull(m); mutex = this ; } SynchronizedMap(Map<K,V> m, Object mutex) { this .m = m; this .mutex = mutex; } public int size () { synchronized (mutex) {return m.size();} } public boolean isEmpty () { synchronized (mutex) {return m.isEmpty();} } public boolean containsKey (Object key) { synchronized (mutex) {return m.containsKey(key);} } public boolean containsValue (Object value) { synchronized (mutex) {return m.containsValue(value);} } public V get (Object key) { synchronized (mutex) {return m.get(key);} } public V put (K key, V value) { synchronized (mutex) {return m.put(key, value);} } public V remove (Object key) { synchronized (mutex) {return m.remove(key);} } public void putAll (Map<? extends K, ? extends V> map) { synchronized (mutex) {m.putAll(map);} } public void clear () { synchronized (mutex) {m.clear();} } ...... }
从源码中,我们可以看出 SynchronizedMap 虽然方法没有加 synchronized 锁,但是利用了“this”作为互斥的 mutex,所以在严格意义上 SynchronizedMap 跟 HashTable 一样,并没有实际的改进。
第三个 ConcurrentHashMap 也是这篇文章的主角,它相对前两种安全的 Map 容器来说,在设计和思想上有较大的变化,也极大的提高了 Map 的并发效率。就 ConcurrentHashMap 容器本身的实现来说,版本之间就会产生较大的差异,典型的就是 JDK1.7 和 JDK1.8 这两个版本,可以说是发生了翻天覆地的变化,在本文中也会介绍这两个版本的 ConcurrentHashMap 实现,主要的重点放在 JDK 1.8 版本上,我个人觉得 JDK 1.7 已经成为了过去式,没必要深入研究。
ConcurrentHashMap 在 JDK 1.7 中的实现 在 JDK 1.7 版本及之前的版本中,ConcurrentHashMap 为了解决 HashTable 会锁住整个 hash 表的问题,提出了分段锁的解决方案 ,分段锁就是将一个大的 hash 表分解成若干份小的 hash 表,需要加锁时就针对小的 hash 表进行加锁,从而来提升 hash 表的性能。JDK1.7 中的 ConcurrentHashMap 引入了 Segment 对象,将整个 hash 表分解成一个一个的 Segment 对象,每个 Segment 对象呢可以看作是一个细粒度的 HashMap。
Segment 对象继承了 ReentrantLock 类,因为 Segment 对象它就变成了一把锁,这样就可以保证数据的安全。 在 Segment 对象中通过 HashEntry 数组来维护其内部的 hash 表。每个 HashEntry 就代表了 map 中的一个 K-V,如果发生 hash 冲突时,在该位置就会形成链表。
JDK1.7 中,ConcurrentHashMap 的整体结构可以描述为下图的样子:
我们对 ConcurrentHashMap 最关心的地方莫过于如何解决 HashMap put 时候扩容引起的不安全问题?一起来看看 JDK1.7 中 ConcurrentHashMap 是如何解决这个问题的,我们先从 put 方法开始:
1 2 3 4 5 6 7 8 9 10 11 12 13 public V put (K key, V value) { Segment<K,V> s; if (value == null ) throw new NullPointerException(); int hash = hash(key.hashCode()); int j = (hash >>> segmentShift) & segmentMask; if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null ) s = ensureSegment(j); return s.put(key, hash, value, false ); }
在 put 方法中,首先是通过二次哈希减小哈希冲突的可能行,根据 hash 值以 Unsafe 调用方式,直接获取相应的 Segment,最终将数据添加到容器中是由 segment对象的 put 方法来完成。Segment对象的 put 方法源代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 final V put (K key, int hash, V value, boolean onlyIfAbsent) { ConcurrentHashMap.HashEntry<K,V> node = tryLock() ? null : scanAndLockForPut(key, hash, value); V oldValue; try { ConcurrentHashMap.HashEntry<K,V>[] tab = table; int index = (tab.length - 1 ) & hash; ConcurrentHashMap.HashEntry<K,V> first = entryAt(tab, index); for (ConcurrentHashMap.HashEntry<K,V> e = first;;) { if (e != null ) { K k; if ((k = e.key) == key || (e.hash == hash && key.equals(k))) { oldValue = e.value; if (!onlyIfAbsent) { e.value = value; ++modCount; } break ; } e = e.next; } else { if (node != null ) node.setNext(first); else node = new ConcurrentHashMap.HashEntry<K,V>(hash, key, value, first); int c = count + 1 ; if (c > threshold && tab.length < MAXIMUM_CAPACITY) rehash(node); else setEntryAt(tab, index, node); ++modCount; count = c; oldValue = null ; break ; } } } finally { unlock(); } return oldValue; }
由于 Segment 对象本身就是一把锁,所以在新增数据的时候,相应的 Segment对象块是被锁住的,其他线程并不能操作这个 Segment 对象,这样就保证了数据的安全性,在扩容时也是这样的,在 JDK1.7 中的 ConcurrentHashMap扩容只是针对 Segment 对象中的 HashEntry 数组进行扩容,还是因为 Segment 对象是一把锁,所以在 rehash 的过程中,其他线程无法对 segment 的 hash 表做操作,这就解决了 HashMap 中 put 数据引起的闭环问题 。
关于 JDK1.7 中的 ConcurrentHashMap 就聊这么多,我们只需要直到在 JDK1.7 中 ConcurrentHashMap 采用分段锁 的方式来解决 HashMap 不安全问题。
ConcurrentHashMap 在 JDK1.8 中的实现 在 JDK1.8 中 ConcurrentHashMap 又发生了翻天覆地的变化,从实现的代码量上就可以看出来,在 1.7 中不到 2000行代码,而在 1.8 中已经 6000多行代码了 。废话不多说,我们来看看有那些变化。
先从容器安全说起,在容器安全上,1.8 中的 ConcurrentHashMap 放弃了 JDK1.7 中的分段技术,而是采用了 CAS 机制 + synchronized 来保证并发安全性,但是在 ConcurrentHashMap 实现里保留了 Segment 定义,这仅仅是为了保证序列化时的兼容性而已,并没有任何结构上的用处。 这里插播个 CAS 机制的知识点:
CAS 机制 CAS 典型的应用莫过于 AtomicInteger 了,CAS 属于原子操作的一种,能够保证一次读写操作是原子的。CAS 通过将内存中的值与期望值进行比较,只有在两者相等时才会对内存中的值进行修改,CAS 是在保证性能的同时提供并发场景下的线程安全性。在 Java 中 CAS 实现位于 sun.misc.Unsafe 类中,该类中定义了大量的 native 方法,CAS 的实现有以下几个方法:
1 2 3 public final native boolean compareAndSwapObject (Object o, long offset, Object expected, Object x) ;public final native boolean compareAndSwapInt (Object o, long offset, int expected, int x) ;public final native boolean compareAndSwapLong (Object o, long offset, long expected, long x) ;
我们只能看到定义,并不能看到具体的实现,具体的实现依赖于操作系统,我们就不去管这些了,简单了解方法里面的参数是啥意思就行了:
o :目标操作对象
offset :目标操作数内存偏移地址
expected :期望值
x :更新值
CAS 机制虽然无需加锁、安全且高效,但也存在一些缺点,概括如下:
循环检查的时间可能较长,不过可以限制循环检查的次数
只能对一个共享变量执行原子操作
存在 ABA 问题(ABA 问题是指在 CAS 两次检查操作期间,目标变量的值由 A 变为 B,又变回 A,但是 CAS 看不到这中间的变换,对它来说目标变量的值并没有发生变化,一直是 A,所以 CAS 操作会继续更新目标变量的值。)
在存储结构上,JDK1.8 中 ConcurrentHashMap 放弃了 HashEntry 结构而是采用了跟 HashMap 结构非常相似,采用 Node 数组加链表(链表长度大于8时转成红黑树)的形式 ,Node 节点设计如下:
1 2 3 4 5 6 7 static class Node <K ,V > implements Map .Entry <K ,V > { final int hash; final K key; volatile V val; volatile Node<K,V> next; ...省略... }
跟 HashMap 一样 Key 字段被 final 修饰,说明在生命周期内,key 是不可变的, val 字段被 volatile 修饰了,这就保证了 val 字段的可见性。
JDK1.8 中的 ConcurrentHashMap 结构如下图所示:
在这里我提一下 ConcurrentHashMap 默认构造函数,我觉得这个地方比较有意思,ConcurrentHashMap 的默认构造函数如下:
1 2 public ConcurrentHashMap() { }
发现没这个构造函数啥事没干,为啥要这样设计?这样做的好处是实现了懒加载(lazy-load 形式),有效避免了初始化的开销,这也是 JDK1.7 中ConcurrentHashMap 被很多人抱怨的地方。
结构上的变化就聊上面的两点,跟上面一样,我们还是来看看我们关心的问题,如何解决 HashMap 扩容时不安全的问题,带着这个问题来阅读 ConcurrentHashMap 的源代码,关于 ConcurrentHashMap 的源代码,在本文中主要聊新增(putVal )和扩容(transfer )这两个方法,其他方法就不在一一介绍了。
putVal 方法 ConcurrentHashMap 新增元素并不是直接调用 putVal 方法,而是使用 put 方法
1 2 3 public V put(K key, V value) { return putVal(key, value, false); }
但是 put 方法调用了 putVal 方法,换一句话来说就是 putVal 是具体的新增方法,是 put 方法的具体实现,在 putVal 方法源码加上了注释,具体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 final V putVal (K key, V value, boolean onlyIfAbsent) { if (key == null || value == null ) throw new NullPointerException(); int hash = spread(key.hashCode()); int binCount = 0 ; for (ConcurrentHashMap.Node<K,V>[] tab = table;;) { ConcurrentHashMap.Node<K,V> f; int n, i, fh; if (tab == null || (n = tab.length) == 0 ) tab = initTable(); else if ((f = tabAt(tab, i = (n - 1 ) & hash)) == null ) { if (casTabAt(tab, i, null , new ConcurrentHashMap.Node<K,V>(hash, key, value, null ))) break ; } else if ((fh = f.hash) == MOVED) tab = helpTransfer(tab, f); else { V oldVal = null ; synchronized (f) { if (tabAt(tab, i) == f) { if (fh >= 0 ) { binCount = 1 ; for (ConcurrentHashMap.Node<K,V> e = f;; ++binCount) { K ek; if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) { oldVal = e.val; if (!onlyIfAbsent) e.val = value; break ; } ConcurrentHashMap.Node<K,V> pred = e; if ((e = e.next) == null ) { pred.next = new ConcurrentHashMap.Node<K,V>(hash, key, value, null ); break ; } } } else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.Node<K,V> p; binCount = 2 ; if ((p = ((ConcurrentHashMap.TreeBin<K,V>)f).putTreeVal(hash, key, value)) != null ) { oldVal = p.val; if (!onlyIfAbsent) p.val = value; } } } } if (binCount != 0 ) { if (binCount >= TREEIFY_THRESHOLD) treeifyBin(tab, i); if (oldVal != null ) return oldVal; break ; } } } addCount(1L , binCount); return null ; }
在源码中有比较详细的注释,如果你想了解详细的实现,可以逐行读源码,在这里我们来对 putVal 方法做一个总结,putVal 方法主要做了以下几件事:
第一步 、在 ConcurrentHashMap 中不允许 key val 字段为空,所以第一步先校验key value 值,key、val 两个字段都不能是 null 才继续往下走,否则直接返回一个 NullPointerException 错误,这点跟 HashMap 有区别,HashMap 是可以允许为空的。
第二步 、判断容器是否初始化,如果容器没有初始化,则调用 initTable 方法初始化,initTable 方法如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 private final Node<K,V>[] initTable() { Node<K,V>[] tab; int sc; while ((tab = table) == null || tab.length == 0 ) { if ((sc = sizeCtl) < 0 ) Thread.yield(); else if (U.compareAndSwapInt(this , SIZECTL, sc, -1 )) { try { if ((tab = table) == null || tab.length == 0 ) { int n = (sc > 0 ) ? sc : DEFAULT_CAPACITY; Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n]; table = tab = nt; sc = n - (n >>> 2 ); } } finally { sizeCtl = sc; } break ; } } return tab; }
Table 本质上就是一个 Node 数组,其初始化过程也就是对 Node 数组的初始化过程,方法中使用了 CAS 策略执行初始化操作。初始化流程为:
1、判断 sizeCtl 值是否小于 0,如果小于 0 则表示 ConcurrentHashMap 正在执行初始化操作,所以需要先等待一会,如果其它线程初始化失败还可以顶替上去 2、如果 sizeCtl 值大于等于 0,则基于 CAS 策略抢占标记 sizeCtl 为 -1,表示 ConcurrentHashMap 正在执行初始化,然后构造 table,并更新 sizeCtl 的值
第三步 、根据双哈希之后的 hash 值找到数组对应的下标位置,如果该位置未存放节点,也就是说不存在 hash 冲突,则使用 CAS 无锁的方式将数据添加到容器中,并且结束循环。
第四步 、如果并未满足第三步,则会判断容器是否正在被其他线程进行扩容操作,如果正在被其他线程扩容,则放弃添加操作,加入到扩容大军中(ConcurrentHashMap 扩容操作采用的是多线程的方式,后面我们会讲到),扩容时并未跳出死循环,这一点就保证了容器在扩容时并不会有其他线程进行数据添加操作,这也保证了容器的安全性 。
第五步 、如果 hash 冲突,则进行链表操作或者红黑树操作(如果链表树超过8,则修改链表为红黑树),在进行链表或者红黑树操作时,会使用 synchronized 锁把头节点被锁住了,保证了同时只有一个线程修改链表,防止出现链表成环 。
第六步 、进行 addCount(1L, binCount) 操作,该操作会更新 size 大小,判断是否需要扩容,addCount 方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 private final void addCount (long x, int check) { ConcurrentHashMap.CounterCell[] as; long b, s; if ((as = counterCells) != null || !U.compareAndSwapLong(this , BASECOUNT, b = baseCount, s = b + x)) { ConcurrentHashMap.CounterCell a; long v; int m; boolean uncontended = true ; if (as == null || (m = as.length - 1 ) < 0 || (a = as[ThreadLocalRandom.getProbe() & m]) == null || !(uncontended = U.compareAndSwapLong(a, CELLVALUE, v = a.value, v + x))) { fullAddCount(x, uncontended); return ; } if (check <= 1 ) return ; s = sumCount(); } if (check >= 0 ) { ConcurrentHashMap.Node<K,V>[] tab, nt; int n, sc; while (s >= (long )(sc = sizeCtl) && (tab = table) != null && (n = tab.length) < MAXIMUM_CAPACITY) { int rs = resizeStamp(n); if (sc < 0 ) { if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 || sc == rs + MAX_RESIZERS || (nt = nextTable) == null || transferIndex <= 0 ) break ; if (U.compareAndSwapInt(this , SIZECTL, sc, sc + 1 )) transfer(tab, nt); } else if (U.compareAndSwapInt(this , SIZECTL, sc, (rs << RESIZE_STAMP_SHIFT) + 2 )) transfer(tab, null ); s = sumCount(); } }
addCount 方法做了两个工作: 1、对 map 的 size 加一 2、检查是否需要扩容,或者是否正在扩容。如果需要扩容,就调用扩容方法,如果正在扩容,就帮助其扩容。
扩容 transfer 方法 扩容 transfer 方法是一个非常牛逼的方法,在看具体的 transfer 源码之前,我们先来了解一下什么时候会触发扩容操作,不出意外的话,以下两种情况下可能触发扩容操作 :
调用 put 方法新增元素之后,会调用 addCount 方法来更新 size 大小,并检查是否需要进行扩容,当数组元素个数达到阈值时,会触发transfer方法
触发了 tryPresize 操作, tryPresize 操作会触发扩容操作 ,有两种情况会触发 tryPresize 操作:
第一种情况:当某节点的链表元素个数达到阈值 8 时,这时候需要将链表转成红黑树,在结构转换之前会,会先判断数组长度 n 是否小于阈值MIN_TREEIFY_CAPACITY,默认是64,如果小于则会调用tryPresize方法把数组长度扩大到原来的两倍,并触发transfer方法,重新调整节点的位置。
第二种情况:在 putAll 操作时会先触发 tryPresize 操作。
tryPresize 方法源码如下:
好了,知道什么时候会触发扩容后,我们来看看 扩容 transfer 方法的源码,这也是一块硬骨头,非常难啃,希望我可以尽量的把它讲清楚,transfer 方法源码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 private final void transfer (ConcurrentHashMap.Node<K,V>[] tab, ConcurrentHashMap.Node<K,V>[] nextTab) { int n = tab.length, stride; if ((stride = (NCPU > 1 ) ? (n >>> 3 ) / NCPU : n) < MIN_TRANSFER_STRIDE) stride = MIN_TRANSFER_STRIDE; if (nextTab == null ) { try { @SuppressWarnings ("unchecked" ) ConcurrentHashMap.Node<K,V>[] nt = (ConcurrentHashMap.Node<K,V>[])new ConcurrentHashMap.Node<?,?>[n << 1 ]; nextTab = nt; } catch (Throwable ex) { sizeCtl = Integer.MAX_VALUE; return ; } nextTable = nextTab; transferIndex = n; } int nextn = nextTab.length; ConcurrentHashMap.ForwardingNode<K,V> fwd = new ConcurrentHashMap.ForwardingNode<K,V>(nextTab); boolean advance = true ; boolean finishing = false ; for (int i = 0 , bound = 0 ;;) { ConcurrentHashMap.Node<K,V> f; int fh; while (advance) { int nextIndex, nextBound; if (--i >= bound || finishing) advance = false ; else if ((nextIndex = transferIndex) <= 0 ) { i = -1 ; advance = false ; } else if (U.compareAndSwapInt (this , TRANSFERINDEX, nextIndex, nextBound = (nextIndex > stride ? nextIndex - stride : 0 ))) { bound = nextBound; i = nextIndex - 1 ; advance = false ; } } if (i < 0 || i >= n || i + n >= nextn) { int sc; if (finishing) { nextTable = null ; table = nextTab; sizeCtl = (n << 1 ) - (n >>> 1 ); return ; } if (U.compareAndSwapInt(this , SIZECTL, sc = sizeCtl, sc - 1 )) { if ((sc - 2 ) != resizeStamp(n) << RESIZE_STAMP_SHIFT) return ; finishing = advance = true ; i = n; } } else if ((f = tabAt(tab, i)) == null ) advance = casTabAt(tab, i, null , fwd); else if ((fh = f.hash) == MOVED) advance = true ; else { synchronized (f) { if (tabAt(tab, i) == f) { ConcurrentHashMap.Node<K,V> ln, hn; if (fh >= 0 ) { int runBit = fh & n; ConcurrentHashMap.Node<K,V> lastRun = f; for (ConcurrentHashMap.Node<K,V> p = f.next; p != null ; p = p.next) { int b = p.hash & n; if (b != runBit) { runBit = b; lastRun = p; } } if (runBit == 0 ) { ln = lastRun; hn = null ; } else { hn = lastRun; ln = null ; } for (ConcurrentHashMap.Node<K,V> p = f; p != lastRun; p = p.next) { int ph = p.hash; K pk = p.key; V pv = p.val; if ((ph & n) == 0 ) ln = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, ln); else hn = new ConcurrentHashMap.Node<K,V>(ph, pk, pv, hn); } setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } else if (f instanceof ConcurrentHashMap.TreeBin) { ConcurrentHashMap.TreeBin<K,V> t = (ConcurrentHashMap.TreeBin<K,V>)f; ConcurrentHashMap.TreeNode<K,V> lo = null , loTail = null ; ConcurrentHashMap.TreeNode<K,V> hi = null , hiTail = null ; int lc = 0 , hc = 0 ; for (ConcurrentHashMap.Node<K,V> e = t.first; e != null ; e = e.next) { int h = e.hash; ConcurrentHashMap.TreeNode<K,V> p = new ConcurrentHashMap.TreeNode<K,V> (h, e.key, e.val, null , null ); if ((h & n) == 0 ) { if ((p.prev = loTail) == null ) lo = p; else loTail.next = p; loTail = p; ++lc; } else { if ((p.prev = hiTail) == null ) hi = p; else hiTail.next = p; hiTail = p; ++hc; } } ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) : (hc != 0 ) ? new ConcurrentHashMap.TreeBin<K,V>(lo) : t; hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) : (lc != 0 ) ? new ConcurrentHashMap.TreeBin<K,V>(hi) : t; setTabAt(nextTab, i, ln); setTabAt(nextTab, i + n, hn); setTabAt(tab, i, fwd); advance = true ; } } } } } }
想知道具体的实现细节,请逐行读源码,如果遇到不懂得,欢迎留言交流,跟 putVal 方法一样,我们同样来对 transfer 方法进行总结,transfer 大致做了以下几件事件:
第一步 :计算出每个线程每次可以处理的个数,根据 Map 的长度,计算出每个线程(CPU)需要处理的桶(table数组的个数),默认每个线程每次处理 16 个桶,如果小于 16 个,则强制变成 16 个桶。
第二步 :对 nextTab 初始化,如果传入的新 table nextTab 为空,则对 nextTab 初始化,默认是原 table 的两倍
第三步 :引入 ForwardingNode、advance、finishing 变量来辅助扩容,ForwardingNode 表示该节点已经处理过,不需要在处理,advance 表示该线程是否可以下移到下一个桶(true:表示可以下移),finishing 表示是否结束扩容(true:结束扩容,false:未结束扩容) ,具体的逻辑就不说了
第四步 :跳过一些其他细节,直接到数据迁移这一块,在数据转移的过程中会加 synchronized 锁,锁住头节点,同步化操作,防止 putVal 的时候向链表插入数据
第五步 :进行数据迁移,如果这个桶上的节点是链表或者红黑树,则会将节点数据分为低位和高位,计算的规则是通过该节点的 hash 值跟为扩容之前的 table 容器长度进行位运算(&),如果结果为 0 ,则将数据放在新表的低位(当前 table 中为 第 i 个位置,在新表中还是第 i 个位置),结果不为 0 ,则放在新表的高位(当前 table 中为第 i 个位置,在新表中的位置为 i + 当前 table 容器的长度) 。
第六步 :如果桶挂载的是红黑树,不仅需要分离出低位节点和高位节点,还需要判断低位和高位节点在新表以链表还是红黑树的形式存放。
本文整理自
ConcurrentHashMap
仅做个人学习总结所用,遵循CC 4.0 BY-SA版权协议,如有侵权请联系删除!