0
  • 聊天消息
  • 系統(tǒng)消息
  • 評論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會員中心
創(chuàng)作中心

完善資料讓更多小伙伴認識你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

ReentrantLock公平鎖與非公平鎖的源碼分析

科技綠洲 ? 來源:Java技術(shù)指北 ? 作者:Java技術(shù)指北 ? 2023-10-13 14:13 ? 次閱讀

今天為你帶來的是 ReentrantLock 公平鎖與非公平鎖的源碼分析,它是 Java 并發(fā)包下的一個 java.util.concurrent.locks 實現(xiàn)類,實現(xiàn)了 Lock 接口和 Serializable 接口。

圖片

初識

ReentrantLock 類有兩個構(gòu)造函數(shù),一個是默認的不帶參數(shù)的構(gòu)造函數(shù),創(chuàng)建一個默認的非公平鎖的實現(xiàn),一個是帶參數(shù)的構(gòu)造函數(shù),根據(jù)參數(shù) fair 創(chuàng)建一個公平還是非公平的鎖。

public ReentrantLock() {
    sync = new NonfairSync();
}

public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

這里簡單的說一下公平鎖和非公平鎖的定義:

  1. 公平鎖:線程在同步隊列中通過先進先出(FIFO)的方式獲取鎖,每個線程最終都能獲取鎖。
  2. 非公平鎖:線程可以通過插隊的方式搶占鎖,搶不到鎖就進入同步隊列排隊。

NonfairSync 類和 FairSync 類繼承了 Sync 類,它們?nèi)齻€都是 ReentrantLock 的內(nèi)部類。

AbstractQueuedSynchronizer,簡稱 AQS,擁有三個核心組件:

  1. state:volatile 修飾,線程是否可以獲取鎖。
  2. Node:內(nèi)部隊列,雙向鏈表形式,沒有搶到鎖的對象就進入這個隊列。主要字段有:pre 前一個節(jié)點,next 下一個節(jié)點,thread 線程,waitStatus 線程的狀態(tài)。
  3. exclusiveOwnerThread:當(dāng)前搶到鎖的線程。

如下圖,簡單的了解一下 AQS。

圖片

//繼承了 AQS
abstract static class Sync extends AbstractQueuedSynchronizer

//繼承了 Sync 類,定義一個非公平鎖的實現(xiàn)
static final class NonfairSync extends Sync

//繼承了 Sync 類,定義了一個公平鎖的實現(xiàn)
static final class FairSync extends Sync

公平鎖

在分析公平鎖之前,先介紹一下 Sync 類,它是 ReentrantLock 的唯一的屬性,在構(gòu)造函數(shù)中被初始化,決定了用公平鎖還是非公平鎖的方式獲取鎖。

private final Sync sync;

用以下構(gòu)造方法創(chuàng)建一個公平鎖。

Lock lock = new ReentrantLock(true);

沿著 lock.lock() 調(diào)用情況一路往下分析。

//FairSync.lock()
final void lock() {
    // 將 1 作為參數(shù),調(diào)用 AQS 的 acquire 方法獲取鎖
    acquire(1);
}

acquire() 方法主要是干了 3 件事情

  1. tryAcquire() 嘗試獲取鎖。
  2. 獲取鎖失敗后,調(diào)用 addWaiter() 方法將線程封裝成 Node,加入同步隊列。
  3. acquireQueued() 將隊列中的節(jié)點按自旋的方式嘗試獲取鎖。
//AbstractQueuedSynchronizer.acquire()
public final void acquire(int arg) {
    //嘗試獲取鎖,true:直接返回,false:調(diào)用 addWaiter()
    // addWaiter() 將當(dāng)前線程封裝成節(jié)點
    // acquireQueued() 將同步隊列中的節(jié)點循環(huán)嘗試獲取鎖
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

tryAcquire() 嘗試獲取鎖,如果線程本身持有鎖,則將這個線程重入鎖。

//FairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    //當(dāng)前線程
    final Thread current = Thread.currentThread();
    //AQS 中的狀態(tài)值
    int c = getState();
    //無線程占用鎖
    if (c == 0) {
        //當(dāng)前線程 用 cas 的方式設(shè)置狀態(tài)為 1
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            //當(dāng)前線程成功設(shè)置 state 為 1,將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //當(dāng)前線程本來就持有鎖,進入重入邏輯,返回 true
    else if (current == getExclusiveOwnerThread()) {
        //將 state + 1
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        // 設(shè)置 state 變量,當(dāng)前線程持有鎖,不需要用 CAS 設(shè)置 state
        setState(nextc);
        return true;
    }
    //當(dāng)前線程獲取鎖失敗
    return false;
}

hasQueuedPredecessors() 這個方法比較有紳士風(fēng)度,在 tryAcquire() 方法中被第一個調(diào)用,它謙讓比自己排隊長的線程。

//AbstractQueuedSynchronizer.hasQueuedPredecessors()
public final boolean hasQueuedPredecessors() {
    Node t = tail;
    Node h = head;
    Node s;
    // 如果首節(jié)點獲取到了鎖,第二個節(jié)點不是當(dāng)前線程,返回 true,否則返回 false
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

addWaiter() 方法就是將獲取鎖失敗的線程加入到同步隊列尾部。

//AbstractOwnableSynchronizer.addWaiter()
private Node addWaiter(Node mode) {
    //將當(dāng)前線程封裝成一個節(jié)點
    Node node = new Node(Thread.currentThread(), mode);
    //將新節(jié)點加入到同步隊列中
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        // CAS 設(shè)置尾節(jié)點是新節(jié)點
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            //返回新的節(jié)點
            return node;
        }
    }
    //沒有將尾節(jié)點設(shè)置為新節(jié)點
    enq(node);
    return node;
}

//AbstractQueuedSynchronizer.enq()
private Node enq(final Node node) {
    //自旋
    for (;;) {
        //尾節(jié)點為 null,創(chuàng)建新的同步隊列
        Node t = tail;
        if (t == null) {
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            //尾節(jié)點不為 null,CAS方式將新節(jié)點的前一個節(jié)點設(shè)置為尾節(jié)點
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                //舊的尾節(jié)點的下一個節(jié)點為新節(jié)點
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued() 方法當(dāng)節(jié)點為首節(jié)點的時候,再次調(diào)用 tryAcquire() 獲取鎖,否則就阻塞線程,等待被喚醒。

//AbstractQueuedSynchronizer.acquireQueued()
final boolean acquireQueued(final Node node, int arg) {
    //失敗標(biāo)識
    boolean failed = true;
    try {
        //中斷標(biāo)識
        boolean interrupted = false;
        //自旋
        for (;;) {
            //獲取前一個節(jié)點
            final Node p = node.predecessor();
            //如果前一個節(jié)點是首節(jié)點,輪到當(dāng)前線程獲取鎖
            //tryAcquire() 嘗試獲取鎖
            if (p == head && tryAcquire(arg)) {
                //獲取鎖成功,將當(dāng)前節(jié)點設(shè)置為首節(jié)點
                setHead(node);
                //將前一個節(jié)點的 Next 設(shè)置為 null
                p.next = null;
                //獲取鎖成功
                failed = false;
                return interrupted;
            }
            //是否需要阻塞
            if (shouldParkAfterFailedAcquire(p, node) &&
                //阻塞的方法
                parkAndCheckInterrupt())
                //中斷標(biāo)識設(shè)為 true
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

shouldParkAfterFailedAcquire() 線程是否需要被阻塞,更改線程的 waitStatus 為 SIGNAL。parkAndCheckInterrupt() 實現(xiàn)真正的阻塞線程。

//AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire()
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    //前一個節(jié)點的 waitStatus 狀態(tài),默認狀態(tài)為 0,第一次進入必然走 else
    int ws = pred.waitStatus;
    // 第二次,直接返回 true
    if (ws == Node.SIGNAL)
        return true;
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        //將waitStatus 狀態(tài)設(shè)置為 SIGNAL
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

//AbstractQueuedSynchronizer.parkAndCheckInterrupt()
private final boolean parkAndCheckInterrupt() {
    //阻塞當(dāng)前線程
    LockSupport.park(this);
    return Thread.interrupted();
}

以上就是公平鎖獲取鎖的全部過程,總結(jié)一下公平鎖獲取鎖的過程:

  1. 當(dāng)前線程調(diào)用 tryAcquire() 獲取鎖,成功則返回。
  2. 調(diào)用 addWaiter(),將線程封裝成 Node 節(jié)點加入同步隊列。
  3. acquireQueued() 自旋嘗試獲取鎖,成功則返回。
  4. shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
  5. 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。

非公平鎖

用默認的構(gòu)造方式創(chuàng)建一個非公平鎖。lock() 方法上來就嘗試搶占鎖,失敗則調(diào)用 acquire() 方法。

//NonfairSync.lock()
final void lock() {
    //CAS 設(shè)置 state 為 1
    if (compareAndSetState(0, 1))
        //將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
        setExclusiveOwnerThread(Thread.currentThread());
    else
        //設(shè)置失敗,參考上一節(jié)公平鎖的 acquire()
        acquire(1);
}

nonfairTryAcquire() 就沒有紳士風(fēng)度了,沒有了公平鎖 hasQueuedPredecessors() 方法。

//NonfairSync.tryAcquire()
protected final boolean tryAcquire(int acquires) {
    //調(diào)用 ReentrantLock 的 nonfairTryAcquire()
    return nonfairTryAcquire(acquires);
}

//ReentrantLock.nonfairTryAcquire()
final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    //如果 state 變量為 0,用 CAS 設(shè)置 state 的值
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            //將當(dāng)前線程放入 AbstractOwnableSynchronizer 的 exclusiveOwnerThread 變量中
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    //當(dāng)前線程本來就持有鎖,進入重入邏輯,返回 true
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        //設(shè)置 state 變量
        setState(nextc);
        return true;
    }
    return false;
}

以上就是非公平鎖獲取鎖,總結(jié)一下非公平鎖獲取鎖的過程:

  1. lock() 第一次嘗試獲取鎖,成功則返回。
  2. nonfairTryAcquire() 再次嘗試獲取鎖。
  3. 失敗則調(diào)用 addWaiter() 封裝線程為 Node 節(jié)點加入同步隊列。
  4. acquireQueued() 自旋嘗試獲取鎖,成功則返回。
  5. shouldParkAfterFailedAcquire() 將線程設(shè)置為等待喚醒狀態(tài),阻塞當(dāng)前線程。
  6. 如果線程被喚醒,嘗試獲取鎖,成功則返回,失敗則繼續(xù)阻塞。

公平鎖和非公平鎖對比

在下圖源碼中可以看出,公平鎖多了一個 !hasQueuedPredecessors() 用來判斷是否有其他線程比當(dāng)前線程在同步隊列中排隊時間更長。除此之外,非公平鎖在初始時就有 2 次獲取鎖的機會,然后再到同步隊列中排隊。

圖片

unlock() 釋放鎖

獲取鎖之后必須得釋放,同一個線程不管重入了幾次鎖,必須得釋放幾次鎖,不然 state 變量將不會變成 0,鎖被永久占用,其他線程將永遠也獲取不到鎖。

//ReentrantLock.unlock()
public void unlock() {
    sync.release(1);
}

//AbstractQueuedSynchronizer.release()
public final boolean release(int arg) {
    //調(diào)用 Sync 的 tryRelease()
    if (tryRelease(arg)) {
        Node h = head;
        //首節(jié)點不是 null,首節(jié)點的 waitStatus 是 SIGNAL
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

//Sync.tryRelease()
protected final boolean tryRelease(int releases) {
    //state 變量減去 1
    int c = getState() - releases;
    //當(dāng)前線程不是占有鎖的線程,異常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    //state 變量成了 0
    if (c == 0) {
        free = true;
        //將當(dāng)前占有的線程設(shè)置為 null
        setExclusiveOwnerThread(null);
    }
    //設(shè)置 state 變量
    setState(c);
    return free;
}

//AbstractQueuedSynchronizer.unparkSuccessor()
private void unparkSuccessor(Node node) {
    //節(jié)點的 waitStatus 
    int ws = node.waitStatus;
    //為 SIGNAL 的時候,CAS 的方式更新為初始值 0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);
    //獲取下一個節(jié)點
    Node s = node.next;
    //下一個節(jié)點為空,或者 waitStatus 狀態(tài)為 CANCELLED
    if (s == null || s.waitStatus > 0) {
        s = null;
        //從最后一個節(jié)點開始找出 waitStatus 不是 CANCELLED 的節(jié)點
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    //下一個節(jié)點不是 null,喚醒它
    if (s != null)
        LockSupport.unpark(s.thread);
}

釋放鎖的邏輯就是 state 必須被減去 1 直到為 0,才可以喚醒下一個線程。

總結(jié)

ReentrantLock 主要是防止資源的使用沖突,保證同一個時間只能有一個線程在使用資源。比如:文件操作,同步發(fā)送消息等等。

本文分析了 ReentrantLock 的公平鎖和非公平鎖以及釋放鎖的原理,可以得出非公平鎖的效率比公平鎖效率高,非公平鎖初始時會 2 次獲取鎖,如果成功可以減少線程切換帶來的損耗。在非公平模式下,線程可能一直搶占不到鎖。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問題,請聯(lián)系本站處理。 舉報投訴
  • 接口
    +關(guān)注

    關(guān)注

    33

    文章

    8684

    瀏覽量

    151622
  • 源碼
    +關(guān)注

    關(guān)注

    8

    文章

    651

    瀏覽量

    29346
  • 函數(shù)
    +關(guān)注

    關(guān)注

    3

    文章

    4344

    瀏覽量

    62849
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    505

    瀏覽量

    19718
收藏 人收藏

    評論

    相關(guān)推薦

    平衡創(chuàng)新與倫理:AI時代的隱私保護和算法公平

    ,如果醫(yī)生和患者都能了解AI推薦治療方案的原因,將大大增加對技術(shù)的接受度和信任。 算法公平性的保障同樣不可或缺。AI系統(tǒng)在設(shè)計時就需要考慮到多樣性和包容性,避免因為訓(xùn)練數(shù)據(jù)的偏差而導(dǎo)致結(jié)果的不公平
    發(fā)表于 07-16 15:07

    #硬聲創(chuàng)作季 【Redis分布式】從Redisson源碼剖析非公平加鎖機制

    編程語言源碼分布式ruby/rails
    Mr_haohao
    發(fā)布于 :2022年09月14日 07:21:34

    Lock體系結(jié)構(gòu)和讀寫機制解析

    接口之一,規(guī)定了資源使用的幾個基礎(chǔ)方法。ReentrantLock類實現(xiàn)Lock接口的可重入,即線程如果獲得當(dāng)前實例的,并進入任務(wù)方法,在線程沒有釋放
    發(fā)表于 01-05 17:53

    互斥量源碼分析測試

    文章目錄互斥量源碼分析測試參考資料:RTT官網(wǎng)文檔關(guān)鍵字:分析RT-Thread源碼、stm32、RTOS、互斥量。互斥量在其他書籍中的名稱:mutex :互斥
    發(fā)表于 08-24 06:01

    如何去獲取Arm Spinlock的公平性呢

    spinlock.不同的機制會有不同的CPU獲取到公平性問題。為了得到比較直觀感受,我寫了一個test application,在big.LITTLE的A53+A73的平臺,和在一個
    發(fā)表于 08-04 14:46

    具有隱私性的公平文檔交換協(xié)議

    提出一種新的公平文檔交換協(xié)議。在該協(xié)議中,交換雙方都各自擁有一個秘密消息,他們想以一種公平的方式交換他們的秘密消息,即交換結(jié)束后,交換雙方要么都獲得對方的秘密
    發(fā)表于 03-23 09:22 ?14次下載

    基于分層時間有色Petri網(wǎng)的支付協(xié)議公平分析

    電子支付協(xié)議是一種重要的電子商務(wù)協(xié)議,公平性是其重要的安全屬性之一。該文提出一種基于分層時間有色Petri 網(wǎng)(HTCPN)的電子支付協(xié)議形式化分析方法。該方法在進行公平分析
    發(fā)表于 11-17 13:38 ?9次下載

    存器的原理分析

    存器的原理分析 存器就是把單片機的輸出的數(shù)先存起來,可以讓單片機繼續(xù)做其它事.. 比如74HC373就是一種存器 它的LE為高
    發(fā)表于 03-09 09:54 ?6.8w次閱讀

    基于鄰近點算法的比例公平優(yōu)化方法

    (基于吞吐量的公平性),從而降低網(wǎng)絡(luò)整體的性能。為了克服這一性能異常問題,基于比例公平的優(yōu)化由于其吞吐量增強能力已經(jīng)引起廣大的關(guān)注。在本文中,提出了一種基于鄰近點算法的比例公平優(yōu)化方法,每個競爭節(jié)點根據(jù)其鏈路質(zhì)量的差異使用不同的
    發(fā)表于 11-11 10:42 ?7次下載
    基于鄰近點算法的比例<b class='flag-5'>公平</b>優(yōu)化方法

    基于公平心跳超時容錯機制

    針對官方的Hadoop軟件中提供的節(jié)點心跳超時容錯機制對短作業(yè)并不合理,而且忽略了異構(gòu)集群中各節(jié)點超期時間設(shè)置的公平性的問題,提出了公平心跳超時容錯機制。首先根據(jù)每個節(jié)點的可靠性及計算性能構(gòu)建節(jié)點
    發(fā)表于 01-02 10:43 ?0次下載

    公平高效機會干擾對齊算法

    中選擇信道質(zhì)量最優(yōu)的通信用戶,然后通過設(shè)計次基站的有用信號空間完全消除主小區(qū)用戶對次基站的干擾,進一步在次小區(qū)中以干擾泄露最小化為原則選擇通信用戶,最后從理論上分析證明了公平性和最小傳輸塊數(shù)等性能。仿真結(jié)果表明
    發(fā)表于 01-08 15:59 ?0次下載
    <b class='flag-5'>公平</b>高效機會干擾對齊算法

    人工智能將會是新式的“電力”,公平教育時代正在到來

    教育公平的觀念源遠流長,追求教育公平是人類社會古老的理念。
    的頭像 發(fā)表于 05-28 10:15 ?2352次閱讀

    比特幣分配機制最公平的原因是什么

    比特幣協(xié)議中最早設(shè)計的分配機制至今仍然是最公平、也是最可靠的。
    發(fā)表于 07-19 14:59 ?2207次閱讀

    人工智能的算法公平性實現(xiàn)

    我們解決了算法公平性的問題:確保分類器的結(jié)果不會偏向于敏感的變量值,比如年齡、種族或性別。由于一般的公平性度量可以表示為變量之間(條件)獨立性的度量,我們提出使用Renyi最大相關(guān)系數(shù)將公平性度量推廣到連續(xù)變量。
    發(fā)表于 11-06 17:04 ?2664次閱讀
    人工智能的算法<b class='flag-5'>公平</b>性實現(xiàn)

    怎么讓機器理解“什么是公平

    來源:ST社區(qū) “什么是公平”,就算是人類自己也沒有統(tǒng)一的標(biāo)準,它有時取決于語境。不論是在家里,還是在學(xué)校,教導(dǎo)小孩要公平是至關(guān)重要的,但說起來容易做起來難。正因為如此,我們要如何才能將社會上所說
    的頭像 發(fā)表于 12-22 22:06 ?486次閱讀