今天為你帶來的是 ReentrantLock 公平鎖與非公平鎖的源碼分析,它是 Java 并發(fā)包下的一個 java.util.concurrent.locks 實現(xiàn)類,實現(xiàn)了 Lock 接口和 Serializable 接口。
初識
ReentrantLock 類有兩個構(gòu)造函數(shù),一個是默認的不帶參數(shù)的構(gòu)造函數(shù),創(chuàng)建一個默認的非公平鎖的實現(xiàn),一個是帶參數(shù)的構(gòu)造函數(shù),根據(jù)參數(shù) fair 創(chuàng)建一個公平還是非公平的鎖。
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
這里簡單的說一下公平鎖和非公平鎖的定義:
- 公平鎖:線程在同步隊列中通過先進先出(FIFO)的方式獲取鎖,每個線程最終都能獲取鎖。
- 非公平鎖:線程可以通過插隊的方式搶占鎖,搶不到鎖就進入同步隊列排隊。
NonfairSync 類和 FairSync 類繼承了 Sync 類,它們?nèi)齻€都是 ReentrantLock 的內(nèi)部類。
AbstractQueuedSynchronizer,簡稱 AQS,擁有三個核心組件:
- state:volatile 修飾,線程是否可以獲取鎖。
- Node:內(nèi)部隊列,雙向鏈表形式,沒有搶到鎖的對象就進入這個隊列。主要字段有:pre 前一個節(jié)點,next 下一個節(jié)點,thread 線程,waitStatus 線程的狀態(tài)。
- exclusiveOwnerThread:當(dāng)前搶到鎖的線程。
如下圖,簡單的了解一下 AQS。
//繼承了 AQS
abstract static class Sync extends AbstractQueuedSynchronizer
//繼承了 Sync 類,定義一個非公平鎖的實現(xiàn)
static final class NonfairSync extends Sync
//繼承了 Sync 類,定義了一個公平鎖的實現(xiàn)
static final class FairSync extends Sync
公平鎖
在分析公平鎖之前,先介紹一下 Sync 類,它是 ReentrantLock 的唯一的屬性,在構(gòu)造函數(shù)中被初始化,決定了用公平鎖還是非公平鎖的方式獲取鎖。
private final Sync sync;
用以下構(gòu)造方法創(chuàng)建一個公平鎖。
Lock lock = new ReentrantLock(true);
沿著 lock.lock()
調(diào)用情況一路往下分析。
//FairSync.lock()
final void lock() {
// 將 1 作為參數(shù),調(diào)用 AQS 的 acquire 方法獲取鎖
acquire(1);
}
acquire() 方法主要是干了 3 件事情
- tryAcquire() 嘗試獲取鎖。
- 獲取鎖失敗后,調(diào)用 addWaiter() 方法將線程封裝成 Node,加入同步隊列。
- acquireQueued() 將隊列中的節(jié)點按自旋的方式嘗試獲取鎖。
//AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
//嘗試獲取鎖,true:直接返回,false:調(diào)用 addWaiter()
// addWaiter() 將當(dāng)前線程封裝成節(jié)點
// acquireQueued() 將同步隊列中的節(jié)點循環(huán)嘗試獲取鎖
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire() 嘗試獲取鎖,如果線程本身持有鎖,則將這個線程重入鎖。
//FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
//當(dāng)前線程
final Thread current = Thread.currentThread();
//AQS 中的狀態(tài)值
int c = getState();
//無線程占用鎖
if (c == 0) {
//當(dāng)前線程 用 cas 的方式設(shè)置狀態(tài)為 1
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
//當(dāng)前線程成功設(shè)置 state 為 1,將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(current);
return true;
}
}
//當(dāng)前線程本來就持有鎖,進入重入邏輯,返回 true
else if (current == getExclusiveOwnerThread()) {
//將 state + 1
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
// 設(shè)置 state 變量,當(dāng)前線程持有鎖,不需要用 CAS 設(shè)置 state
setState(nextc);
return true;
}
//當(dāng)前線程獲取鎖失敗
return false;
}
hasQueuedPredecessors() 這個方法比較有紳士風(fēng)度,在 tryAcquire() 方法中被第一個調(diào)用,它謙讓比自己排隊長的線程。
//AbstractQueuedSynchronizer.hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
Node t = tail;
Node h = head;
Node s;
// 如果首節(jié)點獲取到了鎖,第二個節(jié)點不是當(dāng)前線程,返回 true,否則返回 false
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
addWaiter() 方法就是將獲取鎖失敗的線程加入到同步隊列尾部。
//AbstractOwnableSynchronizer.addWaiter()
private Node addWaiter(Node mode) {
//將當(dāng)前線程封裝成一個節(jié)點
Node node = new Node(Thread.currentThread(), mode);
//將新節(jié)點加入到同步隊列中
Node pred = tail;
if (pred != null) {
node.prev = pred;
// CAS 設(shè)置尾節(jié)點是新節(jié)點
if (compareAndSetTail(pred, node)) {
pred.next = node;
//返回新的節(jié)點
return node;
}
}
//沒有將尾節(jié)點設(shè)置為新節(jié)點
enq(node);
return node;
}
//AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
//自旋
for (;;) {
//尾節(jié)點為 null,創(chuàng)建新的同步隊列
Node t = tail;
if (t == null) {
if (compareAndSetHead(new Node()))
tail = head;
} else {
//尾節(jié)點不為 null,CAS方式將新節(jié)點的前一個節(jié)點設(shè)置為尾節(jié)點
node.prev = t;
if (compareAndSetTail(t, node)) {
//舊的尾節(jié)點的下一個節(jié)點為新節(jié)點
t.next = node;
return t;
}
}
}
}
acquireQueued() 方法當(dāng)節(jié)點為首節(jié)點的時候,再次調(diào)用 tryAcquire() 獲取鎖,否則就阻塞線程,等待被喚醒。
//AbstractQueuedSynchronizer.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
//失敗標(biāo)識
boolean failed = true;
try {
//中斷標(biāo)識
boolean interrupted = false;
//自旋
for (;;) {
//獲取前一個節(jié)點
final Node p = node.predecessor();
//如果前一個節(jié)點是首節(jié)點,輪到當(dāng)前線程獲取鎖
//tryAcquire() 嘗試獲取鎖
if (p == head && tryAcquire(arg)) {
//獲取鎖成功,將當(dāng)前節(jié)點設(shè)置為首節(jié)點
setHead(node);
//將前一個節(jié)點的 Next 設(shè)置為 null
p.next = null;
//獲取鎖成功
failed = false;
return interrupted;
}
//是否需要阻塞
if (shouldParkAfterFailedAcquire(p, node) &&
//阻塞的方法
parkAndCheckInterrupt())
//中斷標(biāo)識設(shè)為 true
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
shouldParkAfterFailedAcquire() 線程是否需要被阻塞,更改線程的 waitStatus 為 SIGNAL。parkAndCheckInterrupt() 實現(xiàn)真正的阻塞線程。
//AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前一個節(jié)點的 waitStatus 狀態(tài),默認狀態(tài)為 0,第一次進入必然走 else
int ws = pred.waitStatus;
// 第二次,直接返回 true
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
//將waitStatus 狀態(tài)設(shè)置為 SIGNAL
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
//阻塞當(dāng)前線程
LockSupport.park(this);
return Thread.interrupted();
}
以上就是公平鎖獲取鎖的全部過程,總結(jié)一下公平鎖獲取鎖的過程:
- 當(dāng)前線程調(diào)用 tryAcquire() 獲取鎖,成功則返回。
- 調(diào)用 addWaiter(),將線程封裝成 Node 節(jié)點加入同步隊列。
- acquireQueued() 自旋嘗試獲取鎖,成功則返回。
- shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
- 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。
非公平鎖
用默認的構(gòu)造方式創(chuàng)建一個非公平鎖。lock() 方法上來就嘗試搶占鎖,失敗則調(diào)用 acquire() 方法。
//NonfairSync.lock()
final void lock() {
//CAS 設(shè)置 state 為 1
if (compareAndSetState(0, 1))
//將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(Thread.currentThread());
else
//設(shè)置失敗,參考上一節(jié)公平鎖的 acquire()
acquire(1);
}
nonfairTryAcquire() 就沒有紳士風(fēng)度了,沒有了公平鎖 hasQueuedPredecessors() 方法。
//NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
//調(diào)用 ReentrantLock 的 nonfairTryAcquire()
return nonfairTryAcquire(acquires);
}
//ReentrantLock.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
//如果 state 變量為 0,用 CAS 設(shè)置 state 的值
if (c == 0) {
if (compareAndSetState(0, acquires)) {
//將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
setExclusiveOwnerThread(current);
return true;
}
}
//當(dāng)前線程本來就持有鎖,進入重入邏輯,返回 true
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
//設(shè)置 state 變量
setState(nextc);
return true;
}
return false;
}
以上就是非公平鎖獲取鎖,總結(jié)一下非公平鎖獲取鎖的過程:
- lock() 第一次嘗試獲取鎖,成功則返回。
- nonfairTryAcquire() 再次嘗試獲取鎖。
- 失敗則調(diào)用 addWaiter() 封裝線程為 Node 節(jié)點加入同步隊列。
- acquireQueued() 自旋嘗試獲取鎖,成功則返回。
- shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
- 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。
公平鎖和非公平鎖對比
在下圖源碼中可以看出,公平鎖多了一個 !hasQueuedPredecessors()
用來判斷是否有其他線程比當(dāng)前線程在同步隊列中排隊時間更長。除此之外,非公平鎖在初始時就有 2 次獲取鎖的機會,然后再到同步隊列中排隊。
unlock() 釋放鎖
獲取鎖之后必須得釋放,同一個線程不管重入了幾次鎖,必須得釋放幾次鎖,不然 state 變量將不會變成 0,鎖被永久占用,其他線程將永遠也獲取不到鎖。
//ReentrantLock.unlock()
public void unlock() {
sync.release(1);
}
//AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
//調(diào)用 Sync 的 tryRelease()
if (tryRelease(arg)) {
Node h = head;
//首節(jié)點不是 null,首節(jié)點的 waitStatus 是 SIGNAL
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//Sync.tryRelease()
protected final boolean tryRelease(int releases) {
//state 變量減去 1
int c = getState() - releases;
//當(dāng)前線程不是占有鎖的線程,異常
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//state 變量成了 0
if (c == 0) {
free = true;
//將當(dāng)前占有的線程設(shè)置為 null
setExclusiveOwnerThread(null);
}
//設(shè)置 state 變量
setState(c);
return free;
}
//AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
//節(jié)點的 waitStatus
int ws = node.waitStatus;
//為 SIGNAL 的時候,CAS 的方式更新為初始值 0
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//獲取下一個節(jié)點
Node s = node.next;
//下一個節(jié)點為空,或者 waitStatus 狀態(tài)為 CANCELLED
if (s == null || s.waitStatus > 0) {
s = null;
//從最后一個節(jié)點開始找出 waitStatus 不是 CANCELLED 的節(jié)點
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//下一個節(jié)點不是 null,喚醒它
if (s != null)
LockSupport.unpark(s.thread);
}
釋放鎖的邏輯就是 state 必須被減去 1 直到為 0,才可以喚醒下一個線程。
總結(jié)
ReentrantLock 主要是防止資源的使用沖突,保證同一個時間只能有一個線程在使用資源。比如:文件操作,同步發(fā)送消息等等。
本文分析了 ReentrantLock 的公平鎖和非公平鎖以及釋放鎖的原理,可以得出非公平鎖的效率比公平鎖效率高,非公平鎖初始時會 2 次獲取鎖,如果成功可以減少線程切換帶來的損耗。在非公平模式下,線程可能一直搶占不到鎖。
-
接口
+關(guān)注
關(guān)注
33文章
8684瀏覽量
151622 -
源碼
+關(guān)注
關(guān)注
8文章
651瀏覽量
29346 -
函數(shù)
+關(guān)注
關(guān)注
3文章
4344瀏覽量
62849 -
線程
+關(guān)注
關(guān)注
0文章
505瀏覽量
19718
發(fā)布評論請先 登錄
相關(guān)推薦
評論