0
  • 聊天消息
  • 系統(tǒng)消息
  • 評(píng)論與回復(fù)
登錄后你可以
  • 下載海量資料
  • 學(xué)習(xí)在線課程
  • 觀看技術(shù)視頻
  • 寫文章/發(fā)帖/加入社區(qū)
會(huì)員中心
創(chuàng)作中心

完善資料讓更多小伙伴認(rèn)識(shí)你,還能領(lǐng)取20積分哦,立即完善>

3天內(nèi)不再提示

用Zookeeper怎么實(shí)現(xiàn)一個(gè)分布式鎖?

jf_78858299 ? 來(lái)源:JAVA旭陽(yáng) ? 作者:JAVA旭陽(yáng) ? 2023-05-11 11:02 ? 次閱讀

概述

提到鎖,想必大家可能最先想到的是Java JUC中的synchronized關(guān)鍵字或者可重入鎖ReentrantLock。它能夠保證我們的代碼在同一個(gè)時(shí)刻只有一個(gè)線程執(zhí)行,保證數(shù)據(jù)的一致性和完整性。但是它僅限于單體項(xiàng)目,也就是說(shuō)它們只能保證單個(gè)JVM應(yīng)用內(nèi)線程的順序執(zhí)行。

如果你部署了多個(gè)節(jié)點(diǎn),也就是分布式場(chǎng)景下如何保證不同節(jié)點(diǎn)在同一時(shí)刻只有一個(gè)線程執(zhí)行呢?場(chǎng)景的業(yè)務(wù)場(chǎng)景比如秒殺、搶優(yōu)惠券等,這就引入了我們的分布式鎖,本文我們主要講解利用Zookeeper的特性如何來(lái)實(shí)現(xiàn)我們的分布式鎖。

Zookeeper分布式鎖實(shí)現(xiàn)原理

利用Zookeeper的臨時(shí)順序節(jié)點(diǎn)和監(jiān)聽機(jī)制兩大特性,可以幫助我們實(shí)現(xiàn)分布式鎖。

圖片

  1. 首先得有一個(gè)持久節(jié)點(diǎn)/locks, 路徑服務(wù)于某個(gè)使用場(chǎng)景,如果有多個(gè)使用場(chǎng)景建議路徑不同。
  2. 請(qǐng)求進(jìn)來(lái)時(shí)首先在/locks創(chuàng)建臨時(shí)有序節(jié)點(diǎn),所有會(huì)看到在/locks下面有seq-000000000, seq-00000001 等等節(jié)點(diǎn)。
  3. 然后判斷當(dāng)前創(chuàng)建得節(jié)點(diǎn)是不是/locks路徑下面最小的節(jié)點(diǎn),如果是,獲取鎖,不是,阻塞線程,同時(shí)設(shè)置監(jiān)聽器,監(jiān)聽前一個(gè)節(jié)點(diǎn)。
  4. 獲取到鎖以后,開始處理業(yè)務(wù)邏輯,最后delete當(dāng)前節(jié)點(diǎn),表示釋放鎖。
  5. 后一個(gè)節(jié)點(diǎn)就會(huì)收到通知,喚起線程,重復(fù)上面的判斷。

大家有沒有想過(guò)為什么要設(shè)置對(duì)前一個(gè)節(jié)點(diǎn)的監(jiān)聽?

主要為了避免羊群效應(yīng)。所謂羊群效應(yīng)就是一個(gè)節(jié)點(diǎn)掛掉,所有節(jié)點(diǎn)都去監(jiān)聽,然后做出反應(yīng),這樣會(huì)給服務(wù)器帶來(lái)巨大壓力,所以有了臨時(shí)順序節(jié)點(diǎn),當(dāng)一個(gè)節(jié)點(diǎn)掛掉,只有它后面的那一個(gè)節(jié)點(diǎn)才做出反應(yīng)。

原生Zookeeper客戶端實(shí)現(xiàn)分布式鎖

通過(guò)原生zookeeper api方式的實(shí)現(xiàn),可以加強(qiáng)我們對(duì)zk實(shí)現(xiàn)分布式鎖原理的理解。

public class DistributedLock {

    private String connectString = "10.100.1.176:2281";

    private int sessionTimeout = 2000;

    private ZooKeeper zk;

    private String rootNode = "lock";

    private String subNode = "seq-";

    private String waitPath;

    // 當(dāng)前client創(chuàng)建的子節(jié)點(diǎn)
    private String currentNode;

    private CountDownLatch countDownLatch = new CountDownLatch(1);

    private CountDownLatch waitDownLatch = new CountDownLatch(1);

    public DistributedLock() throws IOException, InterruptedException, KeeperException {
        zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
            @Override
            public void process(WatchedEvent event) {
                // 如果連接建立時(shí),喚醒 wait 在該 latch 上的線程
                if(event.getState() == Event.KeeperState.SyncConnected) {
                    countDownLatch.countDown();
                }

                //  發(fā)生了 waitPath 的刪除事件
                if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
                    waitDownLatch.countDown();
                }
            }
        });

        // 等待連接建立,因?yàn)檫B接建立時(shí)異步過(guò)程
        countDownLatch.await();
        // 獲取根節(jié)點(diǎn)
        Stat stat = zk.exists("/" + rootNode, false);
        // 如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn)
        if(stat == null) {
            System.out.println("創(chuàng)建根節(jié)點(diǎn)");
            zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
        }
    }

    public void zkLock() {
        try {
            // 在根節(jié)點(diǎn)創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
            currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);

            // 獲取子節(jié)點(diǎn)
            List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
            // 如果只有一個(gè)子節(jié)點(diǎn),說(shuō)明是當(dāng)前節(jié)點(diǎn),直接獲得鎖
            if(childrenNodes.size() == 1) {
                return;
            } else {
                //對(duì)根節(jié)點(diǎn)下的所有臨時(shí)順序節(jié)點(diǎn)進(jìn)行從小到大排序
                Collections.sort(childrenNodes);
                //當(dāng)前節(jié)點(diǎn)名稱
                String thisNode = currentNode.substring(("/" + rootNode + "/").length());
                //獲取當(dāng)前節(jié)點(diǎn)的位置
                int index = childrenNodes.indexOf(thisNode);
                if (index == -1) {
                    System.out.println("數(shù)據(jù)異常");
                } else if (index == 0) {
                    // index == 0, 說(shuō)明 thisNode 在列表中最小, 當(dāng)前client 獲得鎖
                    return;
                } else {
                    // 獲得排名比 currentNode 前 1 位的節(jié)點(diǎn)
                    this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
                    // 在 waitPath節(jié)點(diǎn)上注冊(cè)監(jiān)聽器, 當(dāng) waitPath 被刪除時(shí),zookeeper 會(huì)回調(diào)監(jiān)聽器的 process 方法
                    zk.getData(waitPath, true, new Stat());
                    //進(jìn)入等待鎖狀態(tài)
                    waitDownLatch.await();
                }
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void zkUnlock() {
        try {
            zk.delete(this.currentNode, -1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (KeeperException e) {
            e.printStackTrace();
        }
    }
}

測(cè)試代碼如下:

public class DistributedLockTest {

    public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
        DistributedLock lock1 = new DistributedLock();
        DistributedLock lock2 = new DistributedLock();

        new Thread(() -> {
            // 獲取鎖對(duì)象
            try {
                lock1.zkLock();
                System.out.println("線程 1 獲取鎖");
                Thread.sleep(5 * 1000);
                System.out.println("線程 1 釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock1.zkUnlock();
            }
        }).start();

        new Thread(() -> {
            // 獲取鎖對(duì)象
            try {
                lock2.zkLock();
                System.out.println("線程 2 獲取鎖");
                Thread.sleep(5 * 1000);

                System.out.println("線程 2 釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                lock2.zkUnlock();
            }
        }).start();
    }
}

測(cè)試結(jié)果:

線程 2 獲取鎖
線程 2 釋放鎖
線程 1 獲取鎖
線程 1 釋放鎖

獲取鎖和釋放鎖成對(duì)出現(xiàn),說(shuō)明分布式鎖生效了。

Curator框架實(shí)現(xiàn)分布式鎖

在實(shí)際的開發(fā)鐘,我們會(huì)直接使用成熟的框架Curator客戶端,它里面封裝了分布式鎖的實(shí)現(xiàn),避免我們?nèi)ブ貜?fù)造輪子。

  1. pom.xml添加如下依賴
<dependency>
            <groupId>org.apache.curator<span class="hljs-name"groupId>
            <artifactId>curator-recipes<span class="hljs-name"artifactId>
            <version>5.2.1<span class="hljs-name"version>
        <span class="hljs-name"dependency>
  1. 通過(guò)InterProcessLock實(shí)現(xiàn)分布式鎖
public class CuratorLockTest {

    private String connectString = "10.100.1.14:2181";

    private String rootNode = "/locks";

    public static void main(String[] args) {

        new CuratorLockTest().testLock();

    }

    public void testLock() {
        // 分布式鎖1
        InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
        // 分布式鎖2
        InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
        // 第一個(gè)線程
        new Thread(() -> {
            // 獲取鎖對(duì)象
            try {
                lock1.acquire();
                System.out.println("線程 1 獲取鎖");
                // 測(cè)試鎖重入
                lock1.acquire();
                System.out.println("線程 1 再次獲取鎖");
                Thread.sleep(5 * 1000);
                lock1.release();
                System.out.println("線程 1 釋放鎖");
                lock1.release();
                System.out.println("線程 1 再次釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();

        // 第二個(gè)線程
        new Thread(() -> {
            // 獲取鎖對(duì)象
            try {
                lock2.acquire();
                System.out.println("線程 2 獲取鎖");
                // 測(cè)試鎖重入
                lock2.acquire();
                System.out.println("線程 2 再次獲取鎖");
                Thread.sleep(5 * 1000);
                lock2.release();
                System.out.println("線程 2 釋放鎖");
                lock2.release();
                System.out.println("線程 2 再次釋放鎖");
            } catch (Exception e) {
                e.printStackTrace();
            }
        }).start();
    }

    public CuratorFramework getCuratorFramework() {
        CuratorFramework client = CuratorFrameworkFactory.builder()
                .connectString(connectString).connectionTimeoutMs(2000)
                .sessionTimeoutMs(2000)
                .retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
        // 連接
        client.start();
        System.out.println("zookeeper 初始化完成...");
        return client;
    }
}
  1. 結(jié)果展示
線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖

有興趣的看下源碼,它是通過(guò)wait、notify來(lái)實(shí)現(xiàn)阻塞。

代碼https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock

總結(jié)

ZooKeeper分布式鎖(如InterProcessMutex),能有效的解決分布式鎖問(wèn)題,但是性能并不高。

因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過(guò)程中,都要?jiǎng)討B(tài)創(chuàng)建、銷毀瞬時(shí)節(jié)點(diǎn)來(lái)實(shí)現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點(diǎn)只能通過(guò)Leader服務(wù)器來(lái)執(zhí)行,然后Leader服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower機(jī)器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。

在高性能,高并發(fā)的場(chǎng)景下,不建議使用ZooKeeper的分布式鎖,可以使用Redis的分布式鎖。而由于ZooKeeper的高可用特性,所以在并發(fā)量不是太高的場(chǎng)景,推薦使用ZooKeeper的分布式鎖。

聲明:本文內(nèi)容及配圖由入駐作者撰寫或者入駐合作網(wǎng)站授權(quán)轉(zhuǎn)載。文章觀點(diǎn)僅代表作者本人,不代表電子發(fā)燒友網(wǎng)立場(chǎng)。文章及其配圖僅供工程師學(xué)習(xí)之用,如有內(nèi)容侵權(quán)或者其他違規(guī)問(wèn)題,請(qǐng)聯(lián)系本站處理。 舉報(bào)投訴
  • JAVA
    +關(guān)注

    關(guān)注

    19

    文章

    2967

    瀏覽量

    104762
  • 代碼
    +關(guān)注

    關(guān)注

    30

    文章

    4788

    瀏覽量

    68625
  • JVM
    JVM
    +關(guān)注

    關(guān)注

    0

    文章

    158

    瀏覽量

    12228
  • 線程
    +關(guān)注

    關(guān)注

    0

    文章

    504

    瀏覽量

    19687
  • zookeeper
    +關(guān)注

    關(guān)注

    0

    文章

    33

    瀏覽量

    3683
收藏 人收藏

    評(píng)論

    相關(guān)推薦

    在 Java 中利用 redis 實(shí)現(xiàn)個(gè)分布式服務(wù)

    在 Java 中利用 redis 實(shí)現(xiàn)個(gè)分布式服務(wù)
    發(fā)表于 07-05 13:14

    ZooKeeper分布式橋梁開發(fā)

    從傳統(tǒng)Java Web轉(zhuǎn)入分布式系統(tǒng)應(yīng)用,再到接觸分布式協(xié)調(diào)框架ZooKeeper,通過(guò)痛苦的思維邏輯和理念轉(zhuǎn)變,歷經(jīng)一個(gè)月時(shí)間,小伙伴們終于把Zo
    發(fā)表于 10-09 17:46 ?0次下載
    <b class='flag-5'>ZooKeeper</b><b class='flag-5'>分布式</b>橋梁開發(fā)

    Redis 分布式的正確實(shí)現(xiàn)方式

    分布式般有三種實(shí)現(xiàn)方式:1. 數(shù)據(jù)庫(kù)樂觀;2. 基于Redis的分布式
    的頭像 發(fā)表于 05-31 14:19 ?3597次閱讀

    開源分布式協(xié)調(diào)框架Zookeeper個(gè)知識(shí)點(diǎn)詳解

    1 ZooKeeper簡(jiǎn)介 ZooKeeper個(gè)開源的分布式協(xié)調(diào)框架,它的定位是為分布式應(yīng)
    的頭像 發(fā)表于 05-03 09:32 ?1768次閱讀
    開源<b class='flag-5'>分布式</b>協(xié)調(diào)框架<b class='flag-5'>Zookeeper</b>五<b class='flag-5'>個(gè)</b>知識(shí)點(diǎn)詳解

    為什么需要分布式 基于Zookeeper安全嗎

    講清楚。導(dǎo)致很多讀者看了很多文章,依舊云里霧里。例如下面這些問(wèn)題,你能清晰地回答上來(lái)嗎? 基于 Redis 如何實(shí)現(xiàn)個(gè)分布式? Redi
    的頭像 發(fā)表于 08-10 18:06 ?5610次閱讀

    為什么需要分布式?基于Redis如何實(shí)現(xiàn)個(gè)分布式

    分布式鎖相對(duì)應(yīng)的是「單機(jī)」,我們?cè)趯懚嗑€程程序時(shí),避免同時(shí)操作個(gè)共享變量產(chǎn)生數(shù)據(jù)問(wèn)題,通常會(huì)使用
    的頭像 發(fā)表于 03-24 11:55 ?1129次閱讀

    深入理解redis分布式

    系統(tǒng)不同進(jìn)程共同訪問(wèn)共享資源的實(shí)現(xiàn)。如果不同的系統(tǒng)或同一個(gè)系統(tǒng)的不同主機(jī)之間共享了某個(gè)臨界資源,往往需要互斥來(lái)防止彼此干擾,以保證
    的頭像 發(fā)表于 10-08 14:13 ?958次閱讀
    深入理解redis<b class='flag-5'>分布式</b><b class='flag-5'>鎖</b>

    tldb提供分布式使用方法

    前言:分布式分布式系統(tǒng)中個(gè)極為重要的工具。目前有多種分布式
    的頭像 發(fā)表于 11-02 14:44 ?898次閱讀
    tldb提供<b class='flag-5'>分布式</b><b class='flag-5'>鎖</b>使用方法

    redis分布式如何實(shí)現(xiàn)

    的情況,分布式的作用就是確保在同時(shí)間只有個(gè)客戶端可以訪問(wèn)共享資源,從而保證數(shù)據(jù)的致性和正
    的頭像 發(fā)表于 11-16 11:29 ?540次閱讀

    zookeeper分布式原理

    是提供個(gè)高可用的、致性的機(jī)制,用于解決分布式系統(tǒng)中常見的致性問(wèn)題,比如Leader選舉、分布式
    的頭像 發(fā)表于 12-03 16:33 ?651次閱讀

    Zookeeper的原理和作用

    Zookeeper個(gè)分布式協(xié)調(diào)服務(wù),它提供了組豐富的API和工具,用于構(gòu)建分布式應(yīng)用。它可
    的頭像 發(fā)表于 12-03 16:45 ?1509次閱讀

    zookeeper的核心配置文件是什么

    來(lái)定制化Zookeeper的行為和性能。 、介紹 Zookeeper個(gè)高性能的分布式協(xié)調(diào)服
    的頭像 發(fā)表于 12-04 10:33 ?817次閱讀

    redis分布式個(gè)方法

    Redis是種高性能的分布式緩存和鍵值存儲(chǔ)系統(tǒng),它提供了種可靠的分布式解決方案。在分布式
    的頭像 發(fā)表于 12-04 11:22 ?1464次閱讀

    如何實(shí)現(xiàn)Redis分布式

    Redis是個(gè)開源的內(nèi)存數(shù)據(jù)存儲(chǔ)系統(tǒng),可用于高速讀寫操作。在分布式系統(tǒng)中,為了保證數(shù)據(jù)的致性和避免競(jìng)態(tài)條件,常常需要使用分布式
    的頭像 發(fā)表于 12-04 11:24 ?707次閱讀

    分布式的三種實(shí)現(xiàn)方式

    ,下面將分別介紹三種常見的實(shí)現(xiàn)方式。 、基于數(shù)據(jù)庫(kù)實(shí)現(xiàn)分布式分布式系統(tǒng)中,數(shù)據(jù)庫(kù)是最常
    的頭像 發(fā)表于 12-28 10:01 ?905次閱讀