概述
提到鎖,想必大家可能最先想到的是Java JUC中的synchronized
關(guān)鍵字或者可重入鎖ReentrantLock
。它能夠保證我們的代碼在同一個(gè)時(shí)刻只有一個(gè)線程執(zhí)行,保證數(shù)據(jù)的一致性和完整性。但是它僅限于單體項(xiàng)目,也就是說(shuō)它們只能保證單個(gè)JVM應(yīng)用內(nèi)線程的順序執(zhí)行。
如果你部署了多個(gè)節(jié)點(diǎn),也就是分布式場(chǎng)景下如何保證不同節(jié)點(diǎn)在同一時(shí)刻只有一個(gè)線程執(zhí)行呢?場(chǎng)景的業(yè)務(wù)場(chǎng)景比如秒殺、搶優(yōu)惠券等,這就引入了我們的分布式鎖,本文我們主要講解利用Zookeeper的特性如何來(lái)實(shí)現(xiàn)我們的分布式鎖。
Zookeeper分布式鎖實(shí)現(xiàn)原理
利用Zookeeper的臨時(shí)順序節(jié)點(diǎn)和監(jiān)聽機(jī)制兩大特性,可以幫助我們實(shí)現(xiàn)分布式鎖。
- 首先得有一個(gè)持久節(jié)點(diǎn)
/locks
, 路徑服務(wù)于某個(gè)使用場(chǎng)景,如果有多個(gè)使用場(chǎng)景建議路徑不同。 - 請(qǐng)求進(jìn)來(lái)時(shí)首先在
/locks
創(chuàng)建臨時(shí)有序節(jié)點(diǎn),所有會(huì)看到在/locks
下面有seq-000000000, seq-00000001 等等節(jié)點(diǎn)。 - 然后判斷當(dāng)前創(chuàng)建得節(jié)點(diǎn)是不是
/locks
路徑下面最小的節(jié)點(diǎn),如果是,獲取鎖,不是,阻塞線程,同時(shí)設(shè)置監(jiān)聽器,監(jiān)聽前一個(gè)節(jié)點(diǎn)。 - 獲取到鎖以后,開始處理業(yè)務(wù)邏輯,最后delete當(dāng)前節(jié)點(diǎn),表示釋放鎖。
- 后一個(gè)節(jié)點(diǎn)就會(huì)收到通知,喚起線程,重復(fù)上面的判斷。
大家有沒有想過(guò)為什么要設(shè)置對(duì)前一個(gè)節(jié)點(diǎn)的監(jiān)聽?
主要為了避免羊群效應(yīng)。所謂羊群效應(yīng)就是一個(gè)節(jié)點(diǎn)掛掉,所有節(jié)點(diǎn)都去監(jiān)聽,然后做出反應(yīng),這樣會(huì)給服務(wù)器帶來(lái)巨大壓力,所以有了臨時(shí)順序節(jié)點(diǎn),當(dāng)一個(gè)節(jié)點(diǎn)掛掉,只有它后面的那一個(gè)節(jié)點(diǎn)才做出反應(yīng)。
原生Zookeeper客戶端實(shí)現(xiàn)分布式鎖
通過(guò)原生zookeeper api方式的實(shí)現(xiàn),可以加強(qiáng)我們對(duì)zk實(shí)現(xiàn)分布式鎖原理的理解。
public class DistributedLock {
private String connectString = "10.100.1.176:2281";
private int sessionTimeout = 2000;
private ZooKeeper zk;
private String rootNode = "lock";
private String subNode = "seq-";
private String waitPath;
// 當(dāng)前client創(chuàng)建的子節(jié)點(diǎn)
private String currentNode;
private CountDownLatch countDownLatch = new CountDownLatch(1);
private CountDownLatch waitDownLatch = new CountDownLatch(1);
public DistributedLock() throws IOException, InterruptedException, KeeperException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
// 如果連接建立時(shí),喚醒 wait 在該 latch 上的線程
if(event.getState() == Event.KeeperState.SyncConnected) {
countDownLatch.countDown();
}
// 發(fā)生了 waitPath 的刪除事件
if(event.getType() == Event.EventType.NodeDeleted && event.getPath().equals(waitPath)) {
waitDownLatch.countDown();
}
}
});
// 等待連接建立,因?yàn)檫B接建立時(shí)異步過(guò)程
countDownLatch.await();
// 獲取根節(jié)點(diǎn)
Stat stat = zk.exists("/" + rootNode, false);
// 如果根節(jié)點(diǎn)不存在,則創(chuàng)建根節(jié)點(diǎn)
if(stat == null) {
System.out.println("創(chuàng)建根節(jié)點(diǎn)");
zk.create("/" + rootNode, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
}
public void zkLock() {
try {
// 在根節(jié)點(diǎn)創(chuàng)建臨時(shí)順序節(jié)點(diǎn)
currentNode = zk.create("/" + rootNode + "/" + subNode, null, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
// 獲取子節(jié)點(diǎn)
List<String> childrenNodes = zk.getChildren("/" + rootNode, false);
// 如果只有一個(gè)子節(jié)點(diǎn),說(shuō)明是當(dāng)前節(jié)點(diǎn),直接獲得鎖
if(childrenNodes.size() == 1) {
return;
} else {
//對(duì)根節(jié)點(diǎn)下的所有臨時(shí)順序節(jié)點(diǎn)進(jìn)行從小到大排序
Collections.sort(childrenNodes);
//當(dāng)前節(jié)點(diǎn)名稱
String thisNode = currentNode.substring(("/" + rootNode + "/").length());
//獲取當(dāng)前節(jié)點(diǎn)的位置
int index = childrenNodes.indexOf(thisNode);
if (index == -1) {
System.out.println("數(shù)據(jù)異常");
} else if (index == 0) {
// index == 0, 說(shuō)明 thisNode 在列表中最小, 當(dāng)前client 獲得鎖
return;
} else {
// 獲得排名比 currentNode 前 1 位的節(jié)點(diǎn)
this.waitPath = "/" + rootNode + "/" + childrenNodes.get(index - 1);
// 在 waitPath節(jié)點(diǎn)上注冊(cè)監(jiān)聽器, 當(dāng) waitPath 被刪除時(shí),zookeeper 會(huì)回調(diào)監(jiān)聽器的 process 方法
zk.getData(waitPath, true, new Stat());
//進(jìn)入等待鎖狀態(tài)
waitDownLatch.await();
}
}
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void zkUnlock() {
try {
zk.delete(this.currentNode, -1);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (KeeperException e) {
e.printStackTrace();
}
}
}
測(cè)試代碼如下:
public class DistributedLockTest {
public static void main(String[] args) throws IOException, InterruptedException, KeeperException {
DistributedLock lock1 = new DistributedLock();
DistributedLock lock2 = new DistributedLock();
new Thread(() -> {
// 獲取鎖對(duì)象
try {
lock1.zkLock();
System.out.println("線程 1 獲取鎖");
Thread.sleep(5 * 1000);
System.out.println("線程 1 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock1.zkUnlock();
}
}).start();
new Thread(() -> {
// 獲取鎖對(duì)象
try {
lock2.zkLock();
System.out.println("線程 2 獲取鎖");
Thread.sleep(5 * 1000);
System.out.println("線程 2 釋放鎖");
} catch (Exception e) {
e.printStackTrace();
} finally {
lock2.zkUnlock();
}
}).start();
}
}
測(cè)試結(jié)果:
線程 2 獲取鎖
線程 2 釋放鎖
線程 1 獲取鎖
線程 1 釋放鎖
獲取鎖和釋放鎖成對(duì)出現(xiàn),說(shuō)明分布式鎖生效了。
Curator框架實(shí)現(xiàn)分布式鎖
在實(shí)際的開發(fā)鐘,我們會(huì)直接使用成熟的框架Curator客戶端,它里面封裝了分布式鎖的實(shí)現(xiàn),避免我們?nèi)ブ貜?fù)造輪子。
- pom.xml添加如下依賴
<dependency>
<groupId>org.apache.curator<span class="hljs-name"groupId>
<artifactId>curator-recipes<span class="hljs-name"artifactId>
<version>5.2.1<span class="hljs-name"version>
<span class="hljs-name"dependency>
- 通過(guò)
InterProcessLock
實(shí)現(xiàn)分布式鎖
public class CuratorLockTest {
private String connectString = "10.100.1.14:2181";
private String rootNode = "/locks";
public static void main(String[] args) {
new CuratorLockTest().testLock();
}
public void testLock() {
// 分布式鎖1
InterProcessLock lock1 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 分布式鎖2
InterProcessLock lock2 = new InterProcessMutex(getCuratorFramework(), rootNode);
// 第一個(gè)線程
new Thread(() -> {
// 獲取鎖對(duì)象
try {
lock1.acquire();
System.out.println("線程 1 獲取鎖");
// 測(cè)試鎖重入
lock1.acquire();
System.out.println("線程 1 再次獲取鎖");
Thread.sleep(5 * 1000);
lock1.release();
System.out.println("線程 1 釋放鎖");
lock1.release();
System.out.println("線程 1 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
// 第二個(gè)線程
new Thread(() -> {
// 獲取鎖對(duì)象
try {
lock2.acquire();
System.out.println("線程 2 獲取鎖");
// 測(cè)試鎖重入
lock2.acquire();
System.out.println("線程 2 再次獲取鎖");
Thread.sleep(5 * 1000);
lock2.release();
System.out.println("線程 2 釋放鎖");
lock2.release();
System.out.println("線程 2 再次釋放鎖");
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
public CuratorFramework getCuratorFramework() {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(connectString).connectionTimeoutMs(2000)
.sessionTimeoutMs(2000)
.retryPolicy(new ExponentialBackoffRetry(3000, 3)).build();
// 連接
client.start();
System.out.println("zookeeper 初始化完成...");
return client;
}
}
- 結(jié)果展示
線程 1 釋放鎖
線程 1 再次釋放鎖
線程 2 獲取鎖
線程 2 再次獲取鎖
線程 2 釋放鎖
線程 2 再次釋放鎖
有興趣的看下源碼,它是通過(guò)wait、notify來(lái)實(shí)現(xiàn)阻塞。
代碼 : https://github.com/alvinlkk/awesome-java-full-demo/tree/master/zookeeper-demo/zookeeper-lock
總結(jié)
ZooKeeper
分布式鎖(如InterProcessMutex
),能有效的解決分布式鎖問(wèn)題,但是性能并不高。
因?yàn)槊看卧趧?chuàng)建鎖和釋放鎖的過(guò)程中,都要?jiǎng)討B(tài)創(chuàng)建、銷毀瞬時(shí)節(jié)點(diǎn)來(lái)實(shí)現(xiàn)鎖功能。大家知道,ZK中創(chuàng)建和刪除節(jié)點(diǎn)只能通過(guò)Leader服務(wù)器來(lái)執(zhí)行,然后Leader
服務(wù)器還需要將數(shù)據(jù)同不到所有的Follower
機(jī)器上,這樣頻繁的網(wǎng)絡(luò)通信,性能的短板是非常突出的。
在高性能,高并發(fā)的場(chǎng)景下,不建議使用ZooKeeper
的分布式鎖,可以使用Redis
的分布式鎖。而由于ZooKeeper
的高可用特性,所以在并發(fā)量不是太高的場(chǎng)景,推薦使用ZooKeeper
的分布式鎖。
-
JAVA
+關(guān)注
關(guān)注
19文章
2967瀏覽量
104762 -
代碼
+關(guān)注
關(guān)注
30文章
4788瀏覽量
68625 -
JVM
+關(guān)注
關(guān)注
0文章
158瀏覽量
12228 -
線程
+關(guān)注
關(guān)注
0文章
504瀏覽量
19687 -
zookeeper
+關(guān)注
關(guān)注
0文章
33瀏覽量
3683
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論