新聞中心
Java 7的ConcurrenHashMap的源碼我建議大家都看看,那個版本的源碼就是Java多線程編程的教科書。在Java 7的源碼中,作者對悲觀鎖的使用非常謹慎,大多都轉(zhuǎn)換為自旋鎖加volatile獲得相同的語義,即使最后迫不得已要用,作者也會通過各種技巧減少鎖的臨界區(qū)。在上一篇文章中我們也有講到,自旋鎖在臨界區(qū)比較小的時候是一個較優(yōu)的選擇是因為它避免了線程由于阻塞而切換上下文,但本質(zhì)上它也是個鎖,在自旋等待期間只有一個線程能進入臨界區(qū),其他線程只會自旋消耗CPU的時間片。Java 8中ConcurrentHashMap的實現(xiàn)通過一些巧妙的設(shè)計和技巧,避開了自旋鎖的局限,提供了更高的并發(fā)性能。如果說Java 7版本的源碼是在教我們?nèi)绾螌⒈^鎖轉(zhuǎn)換為自旋鎖,那么在Java 8中我們甚至可以看到如何將自旋鎖轉(zhuǎn)換為無鎖的方法和技巧。

新區(qū)網(wǎng)站制作公司哪家好,找創(chuàng)新互聯(lián)建站!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、響應(yīng)式網(wǎng)站設(shè)計等網(wǎng)站項目制作,到程序開發(fā),運營維護。創(chuàng)新互聯(lián)建站成立于2013年到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選創(chuàng)新互聯(lián)建站。
把書讀薄
image
圖片來源:https://www.zhenchao.org/2019/01/31/java/cas-based-concurrent-hashmap/
在開始本文之前,大家首先在心里還是要有這樣的一張圖,如果有同學(xué)對HashMap比較熟悉,那這張圖也應(yīng)該不會陌生。事實上在整體的數(shù)據(jù)結(jié)構(gòu)的設(shè)計上Java 8的ConcurrentHashMap和HashMap基本上是一致的。
Java 7中ConcurrentHashMap為了提升性能使用了很多的編程技巧,但是引入Segment的設(shè)計還是有很大的改進空間的,Java 7中ConcurrrentHashMap的設(shè)計有下面這幾個可以改進的點:
1. Segment在擴容的時候非擴容線程對本Segment的寫操作時都要掛起等待的
2. 對ConcurrentHashMap的讀操作需要做兩次哈希尋址,在讀多寫少的情況下其實是有額外的性能損失的
3. 盡管size()方法的實現(xiàn)中先嘗試無鎖讀,但是如果在這個過程中有別的線程做寫入操作,那調(diào)用size()的這個線程就會給整個ConcurrentHashMap加鎖,這是整個ConcurrrentHashMap唯一一個全局鎖,這點對底層的組件來說還是有性能隱患的
4. 極端情況下(比如客戶端實現(xiàn)了一個性能很差的哈希函數(shù))get()方法的復(fù)雜度會退化到O(n)。
針對1和2,在Java 8的設(shè)計是廢棄了Segment的使用,將悲觀鎖的粒度降低至桶維度,因此調(diào)用get的時候也不需要再做兩次哈希了。size()的設(shè)計是Java 8版本中最大的亮點,我們在后面的文章中會詳細說明。至于紅黑樹,這篇文章仍然不做過多闡述。接下來的篇幅會深挖細節(jié),把書讀厚,涉及到的模塊有:初始化,put方法, 擴容方法transfer以及size()方法,而其他模塊,比如hash函數(shù)等改變較小,故不再深究。
準(zhǔn)備知識
ForwardingNode
- static final class ForwardingNode
extends Node { - final Node
[] nextTable; - ForwardingNode(Node
[] tab) { - // MOVED = -1,F(xiàn)orwardingNode的哈希值為-1
- super(MOVED, null, null, null);
- this.nextTable = tab;
- }
- }
除了普通的Node和TreeNode之外,ConcurrentHashMap還引入了一個新的數(shù)據(jù)類型ForwardingNode,我們這里只展示他的構(gòu)造方法,F(xiàn)orwardingNode的作用有兩個:
- 在動態(tài)擴容的過程中標(biāo)志某個桶已經(jīng)被復(fù)制到了新的桶數(shù)組中
- 如果在動態(tài)擴容的時候有g(shù)et方法的調(diào)用,則ForwardingNode將會把請求轉(zhuǎn)發(fā)到新的桶數(shù)組中,以避免阻塞get方法的調(diào)用,F(xiàn)orwardingNode在構(gòu)造的時候會將擴容后的桶數(shù)組nextTable保存下來。
UNSAFE.compareAndSwap***
這是在Java 8版本的ConcurrentHashMap實現(xiàn)CAS的工具,以int類型為例其方法定義如下:
- /**
- * Atomically update Java variable to x if it is currently
- * holding expected.
- * @return true if successful
- */
- public final native boolean compareAndSwapInt(Object o, long offset,
- int expected,
- int x);
相應(yīng)的語義為:
如果對象o起始地址偏移量為offset的值等于expected,則將該值設(shè)為x,并返回true表明更新成功,否則返回false,表明CAS失敗
初始化
- public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
- if (!(loadFactor > 0.0f) || initialCapacity < 0 || concurrencyLevel <= 0) // 檢查參數(shù)
- throw new IllegalArgumentException();
- if (initialCapacity < concurrencyLevel)
- initialCapacity = concurrencyLevel;
- long size = (long)(1.0 + (long)initialCapacity / loadFactor);
- int cap = (size >= (long)MAXIMUM_CAPACITY) ?
- MAXIMUM_CAPACITY : tableSizeFor((int)size); // tableSizeFor,求不小于size的 2^n的算法,jdk1.8的HashMap中說過
- this.sizeCtl = cap;
- }
即使是最復(fù)雜的一個初始化方法代碼也是比較簡單的,這里我們只需要注意兩個點:
- concurrencyLevel在Java 7中是Segment數(shù)組的長度,由于在Java 8中已經(jīng)廢棄了Segment,因此concurrencyLevel只是一個保留字段,無實際意義
- sizeCtl這個值第一次出現(xiàn),這個值如果等于-1則表明系統(tǒng)正在初始化,如果是其他負數(shù)則表明系統(tǒng)正在擴容,在擴容時sizeCtl二進制的低十六位等于擴容的線程數(shù)加一,高十六位(除符號位之外)包含桶數(shù)組的大小信息
put方法
- public V put(K key, V value) {
- return putVal(key, value, false);
- }
put方法將調(diào)用轉(zhuǎn)發(fā)到putVal方法:
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - // 【A】延遲初始化
- if (tab == null || (n = tab.length) == 0)
- tab = initTable();
- // 【B】當(dāng)前桶是空的,直接更新
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
- if (casTabAt(tab, i, null,
- new Node
(hash, key, value, null))) - break; // no lock when adding to empty bin
- }
- // 【C】如果當(dāng)前的桶的第一個元素是一個ForwardingNode節(jié)點,則該線程嘗試加入擴容
- else if ((ffh = f.hash) == MOVED)
- tab = helpTransfer(tab, f);
- // 【D】否則遍歷桶內(nèi)的鏈表或樹,并插入
- else {
- // 暫時折疊起來,后面詳細看
- }
- }
- // 【F】流程走到此處,說明已經(jīng)put成功,map的記錄總數(shù)加一
- addCount(1L, binCount);
- return null;
- }
從整個代碼結(jié)構(gòu)上來看流程還是比較清楚的,我用括號加字母的方式標(biāo)注了幾個非常重要的步驟,put方法依然牽扯出很多的知識點
桶數(shù)組的初始化
- private final Node
[] initTable() { - Node
[] tab; int sc; - while ((tab = table) == null || tab.length == 0) {
- if ((sc = sizeCtl) < 0)
- // 說明已經(jīng)有線程在初始化了,本線程開始自旋
- Thread.yield(); // lost initialization race; just spin
- else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
- // CAS保證只有一個線程能走到這個分支
- try {
- if ((tab = table) == null || tab.length == 0) {
- int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n]; - tabtable = tab = nt;
- // sc = n - n/4 = 0.75n
- sc = n - (n >>> 2);
- }
- } finally {
- // 恢復(fù)sizeCtl > 0相當(dāng)于釋放鎖
- sizeCtl = sc;
- }
- break;
- }
- }
- return tab;
- }
在初始化桶數(shù)組的過程中,系統(tǒng)如何保證不會出現(xiàn)并發(fā)問題呢,關(guān)鍵點在于自旋鎖的使用,當(dāng)有多個線程都執(zhí)行initTable方法的時候,CAS可以保證只有一個線程能夠進入到真正的初始化分支,其他線程都是自旋等待。這段代碼中我們關(guān)注三點即可:
- 依照前文所述,當(dāng)有線程開始初始化桶數(shù)組時,會通過CAS將sizeCtl置為-1,其他線程以此為標(biāo)志開始自旋等待
- 當(dāng)桶數(shù)組初始化結(jié)束后將sizeCtl的值恢復(fù)為正數(shù),其值等于0.75倍的桶數(shù)組長度,這個值的含義和之前HashMap中的THRESHOLD一致,是系統(tǒng)觸發(fā)擴容的臨界點
- 在finally語句中對sizeCtl的操作并沒有使用CAS是因為CAS保證只有一個線程能夠執(zhí)行到這個地方
添加桶數(shù)組第一個元素
- static final
Node tabAt(Node [] tab, int i) { - return (Node
)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE); - }
- static final
boolean casTabAt(Node [] tab, int i, - Node
c, Node v) { - return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
- }
put方法的第二個分支會用tabAt判斷當(dāng)前桶是否是空的,如果是則會通過CAS寫入,tabAt通過UNSAFE接口會拿到桶中的最新元素,casTabAt通過CAS保證不會有并發(fā)問題,如果CAS失敗,則通過循環(huán)再進入其他分支
判斷是否需要新增線程擴容
- final Node
[] helpTransfer(Node [] tab, Node f) { - Node
[] nextTab; int sc; - if (tab != null && (f instanceof ForwardingNode) &&
- (nextTab = ((ForwardingNode
)f).nextTable) != null) { - int rs = resizeStamp(tab.length);
- while (nextTab == nextTable && table == tab &&
- (sc = sizeCtl) < 0) {
- // RESIZE_STAMP_SHIFT = 16
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || transferIndex <= 0)
- break;
- // 這里將sizeCtl的值自增1,表明參與擴容的線程數(shù)量+1
- if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
- transfer(tab, nextTab);
- break;
- }
- }
- return nextTab;
- }
- return table;
- }
在這個地方我們就要詳細說下sizeCtl這個標(biāo)志位了,臨時變量rs由resizeStamp這個方法返回
- static final int resizeStamp(int n) {
- // RESIZE_STAMP_BITS = 16
- return Integer.numberOfLeadingZeros(n) | (1 << (RESIZE_STAMP_BITS - 1));
- }
因為入?yún)是一個int類型的值,所有Integer.numberOfLeadingZeros(n)的返回值介于0到32之間,如果轉(zhuǎn)換成二進制
- Integer.numberOfLeadingZeros(n)的最大值是:00000000 00000000 00000000 00100000
- Integer.numberOfLeadingZeros(n)的最小值是:00000000 00000000 00000000 00000000
因此resizeStampd的返回值也就介于00000000 00000000 10000000 00000000到00000000 00000000 10000000 00100000之間,從這個返回值的范圍可以看出來resizeStamp的返回值高16位全都是0,是不包含任何信息的。因此在ConcurrrentHashMap中,會把resizeStamp的返回值左移16位拼到sizeCtl中,這就是為什么sizeCtl的高16位包含整個Map大小的原理。有了這個分析,這段代碼中比較長的if判斷也就能看懂了
- if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
- sc == rs + MAX_RESIZERS || transferIndex <= 0)
- break;
- (sc >>> RESIZE_STAMP_SHIFT) != rs保證所有線程要基于同一個舊的桶數(shù)組擴容
- transferIndex <= 0已經(jīng)有線程完成擴容任務(wù)了
至于sc == rs + 1 || sc == rs + MAX_RESIZERS這兩個判斷條件如果是細心的同學(xué)一定會覺得難以理解,這個地方確實是JDK的一個BUG,這個BUG已經(jīng)在JDK 12中修復(fù),詳細情況可以參考一下Oracle的官網(wǎng):https://bugs.java.com/bugdatabase/view_bug.do?bug_id=JDK-8214427,這兩個判斷條件應(yīng)該寫成這樣:sc == (rs << RESIZE_STAMP_SHIFT) + 1 || sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS,因為直接比較rs和sc是沒有意義的,必須要有移位操作。它表達的含義是
- sc == (rs << RESIZE_STAMP_SHIFT) + 1當(dāng)前擴容的線程數(shù)為0,即已經(jīng)擴容完成了,就不需要再新增線程擴容
- sc == (rs << RESIZE_STAMP_SHIFT) + MAX_RESIZERS參與擴容的線程數(shù)已經(jīng)到了最大,就不需要再新增線程擴容
真正擴容的邏輯在transfer方法中,我們后面會詳細看,不過有個小細節(jié)可以提前注意,如果nextTable已經(jīng)初始化了,transfer會返回nextTable的的引用,后續(xù)可以直接操作新的桶數(shù)組。
插入新值
如果桶數(shù)組已經(jīng)初始化好了,該擴容的也擴容了,并且根據(jù)哈希定位到的桶中已經(jīng)有元素了,那流程就跟普通的HashMap一樣了,唯一一點不同的就是,這時候要給當(dāng)前的桶加鎖,且看代碼:
- final V putVal(K key, V value, boolean onlyIfAbsent) {
- if (key == null || value == null) throw new NullPointerException();
- int hash = spread(key.hashCode());
- int binCount = 0;
- for (Node
[] tab = table;;) { - Node
f; int n, i, fh; - if (tab == null || (n = tab.length) == 0)// 折疊
- else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {// 折疊}
- else if ((ffh = f.hash) == MOVED)// 折疊
- else {
- V oldVal = null;
- synchronized (f) {
- // 要注意這里這個不起眼的判斷條件
- if (tabAt(tab, i) == f) {
- if (fh >= 0) { // fh>=0的節(jié)點是鏈表,否則是樹節(jié)點或者ForwardingNode
- binCount = 1;
- for (Node
e = f;; ++binCount) { - K ek;
- if (e.hash == hash &&
- ((eek = e.key) == key ||
- (ek != null && key.equals(ek)))) {
- oldVal = e.val; // 如果鏈表中有值了,直接更新
- if (!onlyIfAbsent)
- e.val = value;
- break;
- }
- Node
pred = e; - if ((ee = e.next) == null) {
- // 如果流程走到這里,則說明鏈表中還沒值,直接連接到鏈表尾部
- pred.next = new Node
(hash, key, value, null); - break;
- }
- }
- }
- // 紅黑樹的操作先略過
- }
- }
- }
- }
- // put成功,map的元素個數(shù)+1
- addCount(1L, binCount);
- return null;
- }
這段代碼中要特備注意一個不起眼的判斷條件(上下文在源碼上已經(jīng)標(biāo)注出來了):tabAt(tab, i) == f,這個判斷的目的是為了處理調(diào)用put方法的線程和擴容線程的競爭。因為synchronized是阻塞鎖,如果調(diào)用put方法的線程恰好和擴容線程同時操作同一個桶,且調(diào)用put方法的線程競爭鎖失敗,等到該線程重新獲取到鎖的時候,當(dāng)前桶中的元素就會變成一個ForwardingNode,那就會出現(xiàn)tabAt(tab, i) != f的情況。
多線程動態(tài)擴容
- private final void transfer(Node
[] tab, Node [] nextTab) { - int n = tab.length, stride;
- if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
- stride = MIN_TRANSFER_STRIDE; // subdivide range
- if (nextTab == null) { // 初始化新的桶數(shù)組
- try {
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n << 1]; - nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTabnextTable = nextTab;
- transferIndex = n;
- }
- int nextn = nextTab.length;
- ForwardingNode
fwd = new ForwardingNode (nextTab); - boolean advance = true;
- boolean finishing = false; // to ensure sweep before committing nextTab
- for (int i = 0, bound = 0;;) {
- Node
f; int fh; - while (advance) {
- int nextIndex, nextBound;
- if (--i >= bound || finishing)
- advance = false;
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) {
- nextTable = null;
- table = nextTab;
- sizeCtl = (n << 1) - (n >>> 1);
- return;
- }
- if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
- // 判斷是會否是最后一個擴容線程
- if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
- return;
- finishing = advance = true;
- i = n; // recheck before commit
- }
- }
- else if ((f = tabAt(tab, i)) == null)
- advance = casTabAt(tab, i, null, fwd);
- else if ((ffh = f.hash) == MOVED) // 只有最后一個擴容線程才有機會執(zhí)行這個分支
- advance = true; // already processed
- else { // 復(fù)制過程與HashMap類似,這里不再贅述
- synchronized (f) {
- // 折疊
- }
- }
- }
- }
在深入到源碼細節(jié)之前我們先根據(jù)下圖看一下在Java 8中ConcurrentHashMap擴容的幾個特點:
- 新的桶數(shù)組nextTable是原先桶數(shù)組長度的2倍,這與之前HashMap一致
- 參與擴容的線程也是分段將table中的元素復(fù)制到新的桶數(shù)組nextTable中
- 桶一個桶數(shù)組中的元素在新的桶數(shù)組中均勻的分布在兩個桶中,桶下標(biāo)相差n(舊的桶數(shù)組的長度),這一點依然與HashMap保持一致
image-20210424202636495
各個線程之間如何通力協(xié)作
先看一個關(guān)鍵的變量transferIndex,這是一個被volatile修飾的變量,這一點可以保證所有線程讀到的一定是最新的值。
- private transient volatile int transferIndex;
這個值會被第一個參與擴容的線程初始化,因為只有第一個參與擴容的線程才滿足條件nextTab == null
- if (nextTab == null) { // initiating
- try {
- @SuppressWarnings("unchecked")
- Node
[] nt = (Node [])new Node,?>[n << 1]; - nextTab = nt;
- } catch (Throwable ex) { // try to cope with OOME
- sizeCtl = Integer.MAX_VALUE;
- return;
- }
- nextTabnextTable = nextTab;
- transferIndex = n;
- }
在了解了transferIndex屬性的基礎(chǔ)上,上面的這個循環(huán)就好理解了
- while (advance) {
- int nextIndex, nextBound;
- // 當(dāng)bound <= i <= transferIndex的時候i自減跳出這個循環(huán)繼續(xù)干活
- if (--i >= bound || finishing)
- advance = false;
- // 擴容的所有任務(wù)已經(jīng)被認領(lǐng)完畢,本線程結(jié)束干活
- else if ((nextIndex = transferIndex) <= 0) {
- i = -1;
- advance = false;
- }
- // 否則認領(lǐng)新的一段復(fù)制任務(wù),并通過`CAS`更新transferIndex的值
- else if (U.compareAndSwapInt
- (this, TRANSFERINDEX, nextIndex,
- nextBound = (nextIndex > stride ?
- nextIndex - stride : 0))) {
- bound = nextBound;
- i = nextIndex - 1;
- advance = false;
- }
- }
transferIndex就像是一個游標(biāo),每個線程認領(lǐng)一段復(fù)制任務(wù)的時候都會通過CAS將其更新為transferIndex - stride, CAS可以保證transferIndex可以按照stride這個步長降到0。
最后一個擴容線程需要二次確認?
對于每一個擴容線程,for循環(huán)的變量i代表要復(fù)制的桶的在桶數(shù)組中的下標(biāo),這個值的上限和下限通過游標(biāo)transferIndex和步長stride計算得來,當(dāng)i減小為負數(shù),則說明當(dāng)前擴容線程完成了擴容任務(wù),這時候流程會走到這個分支:
- // i >= n || i + n >= nextn現(xiàn)在看來取不到
- if (i < 0 || i >= n || i + n >= nextn) {
- int sc;
- if (finishing) { // 【A】完成整個擴容過程
- nextTable = null;
- table = nextTab;
本文題目:Java 8 ConcurrentHashMap源碼中竟然隱藏著兩個Bug
當(dāng)前URL:http://fisionsoft.com.cn/article/djoppch.html


咨詢
建站咨詢
