新聞中心
讀寫鎖簡(jiǎn)介
為什么要有讀鎖
有些朋友可能會(huì)有疑惑,為什么要有讀鎖,讀操作又不會(huì)修改數(shù)據(jù),多線程同時(shí)讀取相同的資源就是安全的,為什么還要加一個(gè)讀鎖呢?

舉個(gè)例子說(shuō)明,在Golang中變量的賦值不是并發(fā)安全的,比如對(duì)一個(gè)int型變量執(zhí)行count++操作,在并發(fā)下執(zhí)行就會(huì)出現(xiàn)預(yù)期之外的結(jié)果,因?yàn)閏ount++操作分為三部分:讀取count的值、將count的值加1,然后再將結(jié)果賦值給count,這不是一個(gè)原子性操作,未加鎖時(shí)在多個(gè)線程同時(shí)對(duì)該變量執(zhí)行count++操作會(huì)造成數(shù)據(jù)不一致,通過(guò)加上寫鎖可以解決這個(gè)問(wèn)題,但是在讀取的時(shí)候我們不加讀鎖會(huì)怎么樣呢?寫個(gè)例子來(lái)看一下,只加寫鎖,不加讀鎖:
package main
import "sync"
const maxValue = 3
type test struct {
rw sync.RWMutex
index int
}
func (t *test) Get() int {
return t.index
}
func (t *test)Set() {
t.rw.Lock()
t.index++
if t.index >= maxValue{
t.index =0
}
t.rw.Unlock()
}
func main() {
t := test{}
sw := sync.WaitGroup{}
for i:=0; i < 100000; i++{
sw.Add(2)
go func() {
t.Set()
sw.Done()
}()
go func() {
val := t.Get()
if val >= maxValue{
print("get value error| value=", val, "\n")
}
sw.Done()
}()
}
sw.Wait()
}
運(yùn)行結(jié)果:
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
get value error| value=3
.....
每次運(yùn)行結(jié)果都是不固定的,因?yàn)槲覀儧](méi)有加讀鎖,如果允許同時(shí)讀和寫,讀取到的數(shù)據(jù)有可能就是中間狀態(tài),所以我們可以總結(jié)出來(lái)讀鎖是很有必要的,讀鎖可以防止讀到寫中間的值。
讀寫鎖的插隊(duì)策略
多個(gè)讀操作同時(shí)進(jìn)行時(shí)也是線程安全的,一個(gè)線程獲取讀鎖后,另外一個(gè)線程同樣可以獲取讀鎖,因?yàn)樽x鎖是共享的,如果一直都有線程加讀鎖,后面再有線程加寫鎖就會(huì)一直獲取不到鎖造成阻塞,這時(shí)就需要一些策略來(lái)保證鎖的公平性,避免出現(xiàn)鎖饑餓,那么Go語(yǔ)言中讀寫鎖采用的是什么插隊(duì)策略來(lái)避免饑餓問(wèn)題呢?
這里我們用一個(gè)例子來(lái)說(shuō)明一下Go語(yǔ)言的插隊(duì)策略:
假設(shè)現(xiàn)在有5個(gè)goroutine分別是G1、G2、G3、G4、G5,現(xiàn)在G1、G2獲取讀鎖成功,還沒(méi)釋放讀鎖,G3要執(zhí)行寫操作,獲取寫鎖失敗就會(huì)阻塞等待,當(dāng)前阻塞寫鎖的讀鎖goroutine數(shù)量為2:
后續(xù)G4進(jìn)來(lái)想要獲取讀鎖,這時(shí)她就會(huì)判斷如果當(dāng)前有寫鎖的goroutine正在阻塞等待,為了避免寫鎖饑餓,那這個(gè)G4也會(huì)進(jìn)入阻塞等待,后續(xù)G5進(jìn)來(lái)想要獲取寫鎖,因?yàn)镚3在占用互斥鎖,所以G5會(huì)進(jìn)入自旋/休眠 阻塞等待;
現(xiàn)在G1、G2釋放了讀鎖,當(dāng)釋放讀鎖是判斷如果阻塞寫鎖goroutine的讀鎖goroutine數(shù)量為0了并且有寫鎖等待就會(huì)喚醒正在阻塞等待的寫鎖G3,G3得到了喚醒:
G3處理完寫操作后會(huì)釋放寫鎖,這一步會(huì)同時(shí)喚醒等待的讀鎖/寫鎖的goroutine,至于G4、G5誰(shuí)能先獲取鎖就看誰(shuí)比較快了,就像搶媳婦一樣,先下手的先得呀。
讀寫鎖的實(shí)現(xiàn)
接下來(lái)我們就深入源碼分析一下,先看一下RWMutex結(jié)構(gòu)都有啥:
type RWMutex struct {
w Mutex // held if there are pending writers
writerSem uint32 // semaphore for writers to wait for completing readers
readerSem uint32 // semaphore for readers to wait for completing writers
readerCount int32 // number of pending readers
readerWait int32 // number of departing readers
}
- w:復(fù)用互斥鎖提供的能力;
- writerSem:寫操作goroutine阻塞等待信號(hào)量,當(dāng)阻塞寫操作的讀操作goroutine釋放讀鎖時(shí),通過(guò)該信號(hào)量通知阻塞的寫操作的goroutine;
- readerSem:讀操作goroutine阻塞等待信號(hào)量,當(dāng)寫操作goroutine釋放寫鎖時(shí),通過(guò)該信號(hào)量通知阻塞的讀操作的goroutine;
- redaerCount:當(dāng)前正在執(zhí)行的讀操作goroutine數(shù)量;
- readerWait:當(dāng)寫操作被阻塞時(shí)等待的讀操作goroutine個(gè)數(shù)。
讀鎖
讀鎖的對(duì)應(yīng)方法如下:
func (rw *RWMutex) RLock() {
// 原子操作readerCount 只要值不是負(fù)數(shù)就表示獲取讀鎖成功
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有一個(gè)正在等待的寫鎖,為了避免饑餓后面進(jìn)來(lái)的讀鎖進(jìn)行阻塞等待
runtime_SemacquireMutex(&rw.readerSem, false, 0)
}
}精簡(jiǎn)了競(jìng)態(tài)檢測(cè)的方法,讀鎖方法就只有兩行代碼了,邏輯如下:
使用原子操作更新readerCount,將readercount值加1,只要原子操作后值不為負(fù)數(shù)就表示加讀鎖成功,如果值為負(fù)數(shù)表示已經(jīng)有寫鎖獲取互斥鎖成功,寫鎖goroutine正在等待或運(yùn)行,所以為了避免饑餓后面進(jìn)來(lái)的讀鎖要進(jìn)行阻塞等待,調(diào)用runtime_SemacquireMutex阻塞等待。
非阻塞加讀鎖
Go語(yǔ)言在1.18中引入了非阻塞加讀鎖的方法:
func (rw *RWMutex) TryRLock() bool {
for {
// 讀取readerCount值能知道當(dāng)前是否有寫鎖在阻塞等待,如果值為負(fù)數(shù),那么后面的讀鎖就會(huì)被阻塞住
c := atomic.LoadInt32(&rw.readerCount)
if c < 0 {
if race.Enabled {
race.Enable()
}
return false
}
// 嘗試獲取讀鎖,for循環(huán)不斷嘗試
if atomic.CompareAndSwapInt32(&rw.readerCount, c, c+1) {
if race.Enabled {
race.Enable()
race.Acquire(unsafe.Pointer(&rw.readerSem))
}
return true
}
}
}因?yàn)樽x鎖是共享的,在沒(méi)有寫鎖阻塞等待時(shí)多個(gè)線程可以同時(shí)獲取,所以原子性操作可能會(huì)失敗,這里采用for循環(huán)來(lái)增加嘗試次數(shù),很是巧妙。
釋放讀鎖
釋放讀鎖代碼主要分為兩部分,第一部分:
func (rw *RWMutex) RUnlock() {
// 將readerCount的值減1,如果值等于等于0直接退出即可;否則進(jìn)入rUnlockSlow處理
if r := atomic.AddInt32(&rw.readerCount, -1); r < 0 {
// Outlined slow-path to allow the fast-path to be inlined
rw.rUnlockSlow(r)
}
}我們都知道readerCount的值代表當(dāng)前正在執(zhí)行的讀操作goroutine數(shù)量,執(zhí)行遞減操作后的值大于等于0表示當(dāng)前沒(méi)有異常場(chǎng)景或?qū)戞i阻塞等待,所以直接退出即可,否則需要處理這兩個(gè)邏輯:
rUnlockSlow邏輯如下:
func (rw *RWMutex) rUnlockSlow(r int32) {
// r+1等于0表示沒(méi)有加讀鎖就釋放讀鎖,異常場(chǎng)景要拋出異常
// r+1 == -rwmutexMaxReaders 也表示沒(méi)有加讀鎖就是釋放讀鎖
// 因?yàn)閷戞i加鎖成功后會(huì)將readerCout的值減去rwmutexMaxReaders
if r+1 == 0 || r+1 == -rwmutexMaxReaders {
race.Enable()
throw("sync: RUnlock of unlocked RWMutex")
}
// 如果有寫鎖正在等待讀鎖時(shí)會(huì)更新readerWait的值,所以一步遞減rw.readerWait值
// 如果readerWait在原子操作后的值等于0了說(shuō)明當(dāng)前阻塞寫鎖的讀鎖都已經(jīng)釋放了,需要喚醒等待的寫鎖
if atomic.AddInt32(&rw.readerWait, -1) == 0 {
// The last reader unblocks the writer.
runtime_Semrelease(&rw.writerSem, false, 1)
}
}解讀一下這段代碼:
- r+1等于0說(shuō)明當(dāng)前goroutine沒(méi)有加讀鎖就進(jìn)行釋放讀鎖操作,屬于非法操作;
- r+1 == -rwmutexMaxReaders 說(shuō)明寫鎖加鎖成功了會(huì)將readerCount的減去rwmutexMaxReaders變成負(fù)數(shù),如果此前沒(méi)有加讀鎖,那么直接釋放讀鎖就會(huì)造成這個(gè)等式成立,也屬于沒(méi)有加讀鎖就進(jìn)行釋放讀鎖操作,屬于非法操作;
- readerWait代表寫操作被阻塞時(shí)讀操作的goroutine數(shù)量,如果有寫鎖正在等待時(shí)就會(huì)更新readerWait的值,讀鎖釋放鎖時(shí)需要readerWait進(jìn)行遞減,如果遞減后等于0說(shuō)明當(dāng)前阻塞寫鎖的讀鎖都已經(jīng)釋放了,需要喚醒等待的寫鎖。(看下文寫鎖的代碼就呼應(yīng)上了)。
寫鎖
寫鎖對(duì)應(yīng)的方法如下:
const rwmutexMaxReaders = 1 << 30
func (rw *RWMutex) Lock() {
// First, resolve competition with other writers.
// 寫鎖也就是互斥鎖,復(fù)用互斥鎖的能力來(lái)解決與其他寫鎖的競(jìng)爭(zhēng)
// 如果寫鎖已經(jīng)被獲取了,其他goroutine在獲取寫鎖時(shí)會(huì)進(jìn)入自旋或者休眠
rw.w.Lock()
// 將readerCount設(shè)置為負(fù)值,告訴讀鎖現(xiàn)在有一個(gè)正在等待運(yùn)行的寫鎖(獲取互斥鎖成功)
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders) + rwmutexMaxReaders
// 獲取互斥鎖成功并不代表goroutine獲取寫鎖成功,我們默認(rèn)最大有2^30的讀操作數(shù)目,減去這個(gè)最大數(shù)目
// 后仍然不為0則表示前面還有讀鎖,需要等待讀鎖釋放并更新寫操作被阻塞時(shí)等待的讀操作goroutine個(gè)數(shù);
if r != 0 && atomic.AddInt32(&rw.readerWait, r) != 0 {
runtime_SemacquireMutex(&rw.writerSem, false, 0)
}
}
代碼量不是很大,但是理解起來(lái)還有一點(diǎn)復(fù)雜,我嘗試用文字來(lái)解析一下,主要分為兩部分:
- 獲取互斥鎖,寫鎖也就是互斥鎖,這里我們復(fù)用互斥鎖mutex的加鎖能力,當(dāng)互斥鎖加鎖成功后,其他寫鎖goroutine再次嘗試獲取鎖時(shí)就會(huì)進(jìn)入自旋休眠等待;
- 判斷獲取寫鎖是否成功,這里有一個(gè)變量rwmutexMaxReaders = 1 << 30表示最大支持2^30個(gè)并發(fā)讀,互斥鎖加鎖成功后,假設(shè)2^30個(gè)讀操作都已經(jīng)釋放了讀鎖,通過(guò)原子操作將readerCount設(shè)置為負(fù)數(shù)在加上2^30,如果此時(shí)r仍然不為0說(shuō)面還有讀操作正在進(jìn)行,則寫鎖需要等待,同時(shí)通過(guò)原子操作更新readerWait字段,也就是更新寫操作被阻塞時(shí)等待的讀操作goroutine個(gè)數(shù);readerWait在上文的讀鎖釋放鎖時(shí)會(huì)進(jìn)行判斷,進(jìn)行遞減,當(dāng)前readerWait遞減到0時(shí)就會(huì)喚醒寫鎖。
非阻塞加寫鎖
Go語(yǔ)言在1.18中引入了非阻塞加鎖的方法:
func (rw *RWMutex) TryLock() bool {
// 先判斷獲取互斥鎖是否成功,沒(méi)有成功則直接返回false
if !rw.w.TryLock() {
if race.Enabled {
race.Enable()
}
return false
}
// 互斥鎖獲取成功了,接下來(lái)就判斷是否是否有讀鎖正在阻塞該寫鎖,如果沒(méi)有直接更新readerCount為
// 負(fù)數(shù)獲取寫鎖成功;
if !atomic.CompareAndSwapInt32(&rw.readerCount, 0, -rwmutexMaxReaders) {
rw.w.Unlock()
if race.Enabled {
race.Enable()
}
return false
}
return true
}
釋放寫鎖
func (rw *RWMutex) Unlock() {
// Announce to readers there is no active writer.
// 將readerCount的恢復(fù)為正數(shù),也就是解除對(duì)讀鎖的互斥
r := atomic.AddInt32(&rw.readerCount, rwmutexMaxReaders)
if r >= rwmutexMaxReaders {
race.Enable()
throw("sync: Unlock of unlocked RWMutex")
}
// 如果后面還有讀操作的goroutine則需要喚醒他們
for i := 0; i < int(r); i++ {
runtime_Semrelease(&rw.readerSem, false, 0)
}
// 釋放互斥鎖,寫操作的goroutine和讀操作的goroutine同時(shí)競(jìng)爭(zhēng)
rw.w.Unlock()
}釋放寫鎖的邏輯比較簡(jiǎn)單,釋放寫鎖會(huì)將會(huì)面的讀操作和寫操作的goroutine都喚醒,然后他們?cè)谶M(jìn)行競(jìng)爭(zhēng)。
總結(jié)
因?yàn)槲覀兩衔囊呀?jīng)分享了互斥鎖的實(shí)現(xiàn)方式,再來(lái)看讀寫鎖就輕松許多了,文末我們?cè)賮?lái)總結(jié)一下讀寫鎖:
- 讀寫鎖提供四種操作:讀上鎖,讀解鎖,寫上鎖,寫解鎖;加鎖規(guī)則是讀讀共享,寫寫互斥,讀寫互斥,寫讀互斥;
- 讀寫鎖中的讀鎖是一定要存在的,其目的是也是為了規(guī)避原子性問(wèn)題,只有寫鎖沒(méi)有讀鎖的情況下會(huì)導(dǎo)致我們讀取到中間值;
- Go語(yǔ)言的讀寫鎖在設(shè)計(jì)上也避免了寫鎖饑餓的問(wèn)題,通過(guò)字段readerCount、readerWait進(jìn)行控制,當(dāng)寫鎖的goroutine被阻塞時(shí),后面進(jìn)來(lái)想要獲取讀鎖的goroutine也都會(huì)被阻塞住,當(dāng)寫鎖釋放時(shí),會(huì)將后面的讀操作goroutine、寫操作的goroutine都喚醒,剩下的交給他們競(jìng)爭(zhēng)吧;
- 讀鎖獲取鎖流程:
鎖空閑時(shí),讀鎖可以立馬被獲取
如果當(dāng)前有寫鎖正在阻塞,那么想要獲取讀鎖的goroutine就會(huì)被休眠
- 釋放讀鎖流程:
當(dāng)前沒(méi)有異常場(chǎng)景或?qū)戞i阻塞等待出現(xiàn)的話,則直接釋放讀鎖成功
若沒(méi)有加讀鎖就釋放讀鎖則拋出異常;
寫鎖被讀鎖阻塞等待的場(chǎng)景下,會(huì)將readerWait的值進(jìn)行遞減,readerWait表示阻塞寫操作goroutine的讀操作goroutine數(shù)量,當(dāng)readerWait減到0時(shí)則可以喚醒被阻塞寫操作的goroutine了;
- 寫鎖獲取鎖流程
寫鎖復(fù)用了mutex互斥鎖的能力,首先嘗試獲取互斥鎖,獲取互斥鎖失敗就會(huì)進(jìn)入自旋/休眠;
獲取互斥鎖成功并不代表寫鎖加鎖成功,此時(shí)如果還有占用讀鎖的goroutine,那么就會(huì)阻塞住,否則就會(huì)加寫鎖成功
- 釋放寫鎖流程
釋放寫鎖會(huì)將負(fù)值的readerCount變成正值,解除對(duì)讀鎖的互斥
喚醒當(dāng)前阻塞住的所有讀鎖
釋放互斥鎖
讀寫鎖的代碼量不多,因?yàn)槠鋸?fù)用了互斥鎖的設(shè)計(jì),針對(duì)讀寫鎖的功能多做了一些工作,理解起來(lái)比互斥鎖要容易很多,你學(xué)會(huì)了嗎?
網(wǎng)頁(yè)題目:面試官:哥們,Go語(yǔ)言的讀寫鎖了解多少?
文章分享:http://fisionsoft.com.cn/article/dphicdh.html


咨詢
建站咨詢
