新聞中心
/**
* Creates an instance of {@code ReentrantLock} with the
* given fairness policy.
*
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
對于需要公平的場景,和我們真實生活一樣,這里的 FairSync 會通過一個 CLH 隊列將請求線程排隊。

在單實例應用中,本機的 CLH 排隊就足夠了,我們現(xiàn)在切換到分布式的場景。
在分布式場景下,為了實現(xiàn)鎖的功能,就出現(xiàn)了各種分布式鎖。相比單實例場景下的鎖只能鎖定自己的實例,分布式的鎖由于統(tǒng)一的外部中間件的介入,將鎖的信息提取到獨立的外部,所以可以將多個應用實例做到互斥。
那分布式的場景下,怎么樣能保證公平呢?
和我們從單實例到分布式加鎖的思路一樣,要公平,就排隊,在統(tǒng)一的第三方處進行排隊。
來第三方這里排隊,也有一些需要注意的點。比如你在判斷當前隊列里有沒有等待,如果沒有就取鎖成功,執(zhí)行,有等待就入隊,而判斷的這個邏輯,仍然可能是并發(fā)操作,也需要做到加鎖處理。就好像你看了一眼某個飯店沒什么人,開心的去買了杯奶茶,回來一看滿了。
對于分布式鎖,基于 Redis 的 Redission 用得不少。如果換成Redis 分布式公平鎖,那基本就只有 Redission 了。
下面我通過兩段代碼,以及部分文字,描述下 Redission 的公平鎖的實現(xiàn)。
簡要概括下:
Redission 的公平鎖,是通過「 Redis + Lua 腳本」來實現(xiàn)的。在拿到一個 Redission 的 Redis 連接之后,通過 「eval()」可以執(zhí)行一段 Lua script,同時傳入一些 key 和 args。因為不管有多少 Lua 的邏輯,都是在同一個連接內(nèi),所以不會存在買完奶茶發(fā)現(xiàn)人滿了的情況。這里應用到了 Redis 的 pub/sub 功能,等待的線程,會在輪到自己時收到 Redis 的提醒,前提是需要訂閱了相應的通知。
來看加鎖的 Lua 邏輯,代碼寫的比較清楚,我也加了些對應的Redis操作以及參數(shù)的注釋。通過 list 來存儲排隊信息,同時每個等待線程都有一個超時時間,超時退出隊列。 所以eval 執(zhí)行這個的時候,返回的是一個 ttl Long 類型。表示過期時間。
[[
用于 lock 操作。
KEYS[1] = lockName
KEYS[2] = waitQueueName
KEYS[3] = timeoutName
ARGV[1] = waitTime
ARGV[2] = lockName
ARGV[3] = leaseTime
ARGV[4] = currentTime
--]]
while true do
local firstThreadId2 = redis.call("lindex", KEYS[2], 0)
if firstThreadId2 == false then
break
end
local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))
if timeout <= tonumber(ARGV[4]) then
redis.call("zrem", KEYS[3], firstThreadId2)
redis.call("lpop", KEYS[2])
else
break
end
end
if
(redis.call("exists", KEYS[1]) == 0) and
((redis.call("exists", KEYS[2]) == 0) or (redis.call("lindex", KEYS[2], 0) == ARGV[2]))
then
redis.call("lpop", KEYS[2])
redis.call("zrem", KEYS[3], ARGV[2]) -- 移除有序集合中的一個或多個成員
redis.call("hset", KEYS[1], ARGV[2], 1) -- 將哈希表 key 中的字段 field 的值設為 value 。
redis.call("pexpire", KEYS[1], ARGV[1])
return nil
end
if (redis.call("hexists", KEYS[1], ARGV[2]) == 1) then -- HEXISTS key field 查看哈希表 key 中,指定的字段是否存在。
redis.call("hincrby", KEYS[1], ARGV[2], 1) -- HINCRBY key field increment 為哈希表 key 中的指定字段的整數(shù)值加上增量 increment 。
redis.call("pexpire", KEYS[1], ARGV[1]) -- Redis PEXPIRE 命令和 EXPIRE 命令的作用類似,但是它以毫秒為單位設置 key 的生存時間,而不像 EXPIRE 命令那樣,以秒為單位。
return nil
end
local firstThreadId = redis.call("lindex", KEYS[2], 0)
local ttl
if firstThreadId ~= false and firstThreadId ~= ARGV[2] then
ttl = tonumber(redis.call("zscore", KEYS[3], firstThreadId)) - tonumber(ARGV[4])
else
ttl = redis.call("pttl", KEYS[1]) -- Redis Pttl 命令以毫秒為單位返回 key 的剩余過期時間。
end
local timeout = ttl + tonumber(ARGV[3]) + tonumber(ARGV[4])
if redis.call("zadd", KEYS[3], timeout, ARGV[2]) == 1 then
redis.call("rpush", KEYS[2], ARGV[2])
end
return ttl
再來看解鎖的邏輯。我前面加鎖的一些內(nèi)容對應著看,重點在于 「publish」這里,在輪到某個線程時,nextThreadId 這個 channel 會收到通知。
這里的 threadId 是我們在加鎖和解鎖的時候都需要傳入的。如果你留意過 Java 的線程 Id 就會發(fā)現(xiàn),不同實例之間有很大概率會重復的。為了避免,各個 Client 在傳入 ThradId 的時候,除了真實的 id 外,還需要加入各個 client 對應的信息加以區(qū)分。
--[[
用于 unlock 操作。
KEYS[1] = lockName
KEYS[2] = waitQueueName
KEYS[3] = timeoutName
KEYS[4] = channelName
ARGV[1] = message
ARGV[2] = leaseTime
ARGV[3] = lockName
ARGV[3] = currentTime
--]]
while true do
local firstThreadId2 = redis.call("lindex", KEYS[2], 0)
if firstThreadId2 == false then
break
end
local timeout = tonumber(redis.call("zscore", KEYS[3], firstThreadId2))
if timeout <= tonumber(ARGV[4]) then
redis.call("zrem", KEYS[3], firstThreadId2)
redis.call("lpop", KEYS[2])
else
break
end
end
if (redis.call("exists", KEYS[1]) == 0) then
local nextThreadId = redis.call("lindex", KEYS[2], 0)
if nextThreadId ~= false then
redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])
end
return 1
end
if (redis.call("hexists", KEYS[1], ARGV[3]) == 0) then
return nil
end
local counter = redis.call("hincrby", KEYS[1], ARGV[3], -1)
if (counter > 0) then
redis.call("pexpire", KEYS[1], ARGV[2])
return 0
end
redis.call("del", KEYS[1])
local nextThreadId = redis.call("lindex", KEYS[2], 0)
if nextThreadId ~= false then
redis.call("publish", KEYS[4] .. ":" .. nextThreadId, ARGV[1])
end
return 1
看過了解鎖邏輯后,外面eval 執(zhí)行加鎖的時候,需要對應有 sub ,才會收到這里解鎖的 pub 信息,否則就卡住了。
這里需要注意下,sub 這個功能是個阻塞操作,需要單獨的線程里執(zhí)行,通過一個 Future 來實現(xiàn)一定等待時間的 sub 功能,超時再 unsub。這塊邏輯 Redission 封裝的比較多,感興趣的可以到源碼里點點。
當前題目:怎樣實現(xiàn)一個分布式的公平鎖?
URL網(wǎng)址:http://fisionsoft.com.cn/article/djsjphh.html


咨詢
建站咨詢
