Rust是一種系統(tǒng)級(jí)編程語(yǔ)言,它帶有嚴(yán)格的內(nèi)存管理、并發(fā)和安全性規(guī)則,因此很受廣大程序員的青睞。RwLock(讀寫(xiě)鎖)是 Rust 中常用的線程同步機(jī)制之一,本文將詳細(xì)介紹 Rust 語(yǔ)言中的 RwLock 的內(nèi)部實(shí)現(xiàn)原理、常用接口的使用技巧和最佳實(shí)踐。
RwLock 的內(nèi)部實(shí)現(xiàn)原理
基本概念
RwLock 是一種讀寫(xiě)分離的鎖,允許多個(gè)線程同時(shí)讀取共享數(shù)據(jù),但只允許一個(gè)線程寫(xiě)入數(shù)據(jù)。通過(guò)這種方式,可以避免讀寫(xiě)操作之間的競(jìng)爭(zhēng),從而提高并發(fā)性能。
在 Rust 中,RwLock 的實(shí)現(xiàn)基于 std::sync::RwLock 結(jié)構(gòu)體。其中,T 表示被保護(hù)的數(shù)據(jù)類型,需要滿足 Send 特質(zhì)以便可以在線程之間傳遞,并且需要滿足 Sync 特質(zhì)以便可以在線程之間共享。
RwLock 是在 std::sync::RwLock 結(jié)構(gòu)體上實(shí)現(xiàn)的,為了方便說(shuō)明,下文中假設(shè) T 為 u32 類型。
RwLock 的基本結(jié)構(gòu)
RwLock 的基本結(jié)構(gòu)如下:
use std::sync::RwLock;
let lock = RwLock::new(0u32);
該代碼將創(chuàng)建一個(gè) RwLock 對(duì)象,其中 T 類型為 u32,初始化值為 0,即該鎖保護(hù)的是一個(gè)名為 data 的 u32 類型變量。
RwLock 的鎖定機(jī)制
我們可以通過(guò)鎖定 RwLock 來(lái)對(duì)數(shù)據(jù)進(jìn)行保護(hù)。RwLock 提供了四個(gè)方法來(lái)完成鎖定操作:
read()
方法:獲取讀鎖,并返回一個(gè) RAII(資源獲取即初始化)的讀取守衛(wèi)。多個(gè)線程可以同時(shí)獲取讀鎖,但是不能同時(shí)持有寫(xiě)鎖。
try_read()
方法:非阻塞地獲取讀鎖。如果讀鎖已經(jīng)被占用,則返回 None。
write()
方法:獲取寫(xiě)鎖,并返回一個(gè) RAII 的寫(xiě)入守衛(wèi)。如果有任何線程正在持有讀鎖或?qū)戞i,則阻塞等待直到它們釋放鎖。
try_write()
方法:非阻塞地獲取寫(xiě)鎖。如果寫(xiě)鎖已經(jīng)被占用,則返回None
。
對(duì)于讀寫(xiě)鎖,我們需要保證寫(xiě)操作在讀操作之前,因此,在調(diào)用 write 方法時(shí),會(huì)等待所有的讀取守衛(wèi)被釋放,并阻止新的讀取守衛(wèi)的創(chuàng)建。為了避免死鎖和優(yōu)先級(jí)反轉(zhuǎn),寫(xiě)入守衛(wèi)還可以降低優(yōu)先級(jí)。
讀寫(xiě)鎖的實(shí)現(xiàn)主要是通過(guò)兩個(gè) Mutex 來(lái)實(shí)現(xiàn)的。一個(gè) Mutex 用于保護(hù)讀取計(jì)數(shù)器,另一個(gè) Mutex 用于保護(hù)寫(xiě)入狀態(tài)。讀取計(jì)數(shù)器統(tǒng)計(jì)當(dāng)前存在多少個(gè)讀取鎖,每當(dāng)一個(gè)新的讀取鎖被請(qǐng)求時(shí),讀取計(jì)數(shù)器就會(huì)自增。當(dāng)讀取計(jì)數(shù)器為 0 時(shí),寫(xiě)入鎖可以被請(qǐng)求。
RwLock 的 Poisoning
類似于 Mutex,RwLock 也支持 poisoning 機(jī)制。如果 RwLock 發(fā)生 panic,那么鎖就成了 poison 狀態(tài),也就是無(wú)法再被使用。任何試圖獲取這個(gè)鎖的線程都會(huì) panic,而不是被阻塞。
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
運(yùn)行后,可能會(huì)出現(xiàn)以下異常信息:
thread 'main' panicked at 'PoisonError { inner: ...
這里的 inner
表示調(diào)用 RwLock 的線程 panic 時(shí)產(chǎn)生的錯(cuò)誤信息。
常用接口的使用技巧
read()
方法
read()
方法用于獲取讀鎖,并返回一個(gè) RAII 的讀取守衛(wèi):
let lock = RwLock::new(0u32);
let r1 = lock.read().unwrap();
let r2 = lock.read().unwrap();
在上面的例子中,r1 和 r2 都是 RwLockWriteGuard 類型的對(duì)象,它們引用的數(shù)據(jù)類型是 u32。這意味著它們只允許讀取 u32 類型的數(shù)據(jù),并且無(wú)法改變它們的值。
讀取守衛(wèi)被析構(gòu)時(shí),RwLock 的讀取計(jì)數(shù)器會(huì)減少,如果讀取計(jì)數(shù)器變?yōu)?0,則寫(xiě)入鎖可以被請(qǐng)求。
write()
方法
write()
方法用于獲取寫(xiě)鎖,并返回一個(gè) RAII 的寫(xiě)入守衛(wèi):
let lock = RwLock::new(0u32);
let mut w1 = lock.write().unwrap();
let mut w2 = lock.write().unwrap();
在上面的例子中,w1 和 w2 都是 RwLockWriteGuard 類型的對(duì)象,它們引用的數(shù)據(jù)類型是 u32。這意味著它們?cè)试S讀寫(xiě) u32 類型的數(shù)據(jù),并且可以改變它們的值。
寫(xiě)入守衛(wèi)被析構(gòu)時(shí),寫(xiě)入鎖立即被釋放,并且所有等待讀取鎖和寫(xiě)入鎖的線程都可以開(kāi)始運(yùn)行。
try_read()
方法
try_read()
方法用于非阻塞地獲取讀鎖。如果讀鎖已經(jīng)被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(r) = lock.try_read() {
println!("read: {}", *r);
} else {
println!("read lock is already taken");
}
try_write()
方法
try_write()
方法用于非阻塞地獲取寫(xiě)鎖。如果寫(xiě)鎖已經(jīng)被占用,則返回 None。
let lock = RwLock::new(0u32);
if let Some(mut w) = lock.try_write() {
*w += 1;
println!("write: {}", *w);
} else {
println!("write lock is already taken");
}
共享所有權(quán)
如果你想在多個(gè)線程之間共享一個(gè) RwLock 對(duì)象,就需要使用 Arc(atomic reference counting,原子引用計(jì)數(shù))來(lái)包裝它:
use std::sync::{Arc, RwLock};
use std::thread;
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let readers = (0..6)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let guard = lock.read().unwrap();
println!("read: {}", *guard);
})
})
.collect::< Vec< _ >>();
let writers = (0..2)
.map(|_| {
let lock = lock.clone();
thread::spawn(move || {
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
})
.collect::< Vec< _ >>();
for reader in readers {
reader.join().unwrap();
}
for writer in writers {
writer.join().unwrap();
}
}
// 輸出結(jié)果:
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// read: 0
// write: 1
// write: 2
實(shí)現(xiàn)鎖超時(shí)功能
Rust標(biāo)準(zhǔn)庫(kù)中的RwLock目前是不支持讀/寫(xiě)超時(shí)功能的。我們可以利用RwLock中非阻塞方法try_read和try_write實(shí)現(xiàn)超時(shí)的特征。
下面進(jìn)一步講解使用std::sync::RwLock和std::time::Duration來(lái)實(shí)現(xiàn)讀超時(shí),具體步驟如下:
- 創(chuàng)建一個(gè)名為TimeoutRwLock的trait,其中包含read_timeout方法。
- 在TimeoutRwLock中添加默認(rèn)實(shí)現(xiàn)(default impl)。
- 在read_timeout方法中,通過(guò)RwLock的try_read_with_timeout方法來(lái)嘗試獲取讀取器(Reader),并且指定一個(gè)等待時(shí)間。
- 如果在等待時(shí)間內(nèi)成功獲取到讀取器,那么將讀取器返回;否則,返回一個(gè)錯(cuò)誤。 下面是代碼實(shí)現(xiàn):
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration;
use std::thread;
use std::thread::sleep;
trait TimeoutRwLock< T > {
fn read_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, String > {
match self.try_read_with_timeout(timeout) {
Ok(guard) = > Ok(guard),
Err(_) = > Err(String::from("timeout")),
}
}
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () >;
}
impl< T > TimeoutRwLock< T > for RwLock< T > {
fn try_read_with_timeout(&self, timeout: Duration) - > Result< RwLockReadGuard< '_, T >, () > {
let now = std::time::Instant::now();
loop {
match self.try_read() {
Ok(guard) = > return Ok(guard),
Err(_) = > {
if now.elapsed() >= timeout {
return Err(());
}
std::thread::sleep(Duration::from_millis(10));
}
}
}
}
}
fn main() {
let lock = Arc::new(RwLock::new(0u32));
let reader = {
let lock = lock.clone();
thread::spawn(
move || match lock.read_timeout(Duration::from_millis(100)) {
Ok(guard) = > {
println!("read: {}", *guard);
}
Err(e) = > {
println!("error: {:?}", e);
}
},
)
};
let writer = {
let lock = lock.clone();
thread::spawn(move || {
sleep(Duration::from_secs(1));
let mut guard = lock.write().unwrap();
*guard += 1;
println!("write: {}", *guard);
})
};
reader.join().unwrap();
writer.join().unwrap();
}
// 輸出結(jié)果:
// read: 0
// write: 1
在這個(gè)實(shí)現(xiàn)中,trait TimeoutRwLock中定義了一個(gè)read_timeout方法,它與try_read方法具有相同的輸入參數(shù)類型和輸出類型。default impl方法是一個(gè)嘗試在給定的等待時(shí)間內(nèi)獲取讀取器(Reader)的循環(huán),并在等待過(guò)程中使用線程(thread)的park_timeout方法來(lái)避免 CPU 占用過(guò)高。如果在等待時(shí)間內(nèi)成功獲取到讀取器(Reader),則返回讀取器;否則返回一個(gè)錯(cuò)誤。
當(dāng)然,除了自己實(shí)現(xiàn)Trait外,還可以使用成熟的第三方庫(kù),例如:parking_lot
RwLock最佳實(shí)踐
- ? 避免使用鎖
鎖是一種解決并發(fā)問(wèn)題的基本機(jī)制,但由于鎖會(huì)引入競(jìng)爭(zhēng)條件、死鎖和其他問(wèn)題,因此應(yīng)盡量避免使用鎖。如果可能,應(yīng)使用更高級(jí)別的機(jī)制,例如 Rust 的通道(channel)。
- ? 避免過(guò)度使用讀寫(xiě)鎖
在某些情況下,讀寫(xiě)鎖可能會(huì)比互斥鎖更慢。例如,如果有太多的讀取器,并且它們?cè)趽碛凶x取鎖時(shí)花費(fèi)了大量時(shí)間,那么寫(xiě)入器的等待時(shí)間可能會(huì)很長(zhǎng)。因此,使用讀寫(xiě)鎖時(shí),應(yīng)仔細(xì)考慮讀寫(xiě)比例,以避免過(guò)度使用讀寫(xiě)鎖。
- ? 鎖的可重入性
RwLock 是可重入的;一個(gè)線程占有寫(xiě)鎖時(shí)可以再次占有讀鎖,并且同樣可以占有寫(xiě)鎖。但這種情況要非常小心,因?yàn)榭赡軙?huì)導(dǎo)致死鎖。
- ? 盡量縮小鎖的范圍
鎖的范圍越小,競(jìng)爭(zhēng)就越少,性能就越好。因此,應(yīng)盡量在需要的地方使用鎖,而在不需要的地方釋放鎖。例如,在讀寫(xiě)數(shù)據(jù)之前,可以先將數(shù)據(jù)復(fù)制到本地變量中,然后釋放鎖,以便其它線程可以訪問(wèn)該數(shù)據(jù),而不必爭(zhēng)奪鎖。在本地變量上執(zhí)行讀寫(xiě)操作時(shí),不需要鎖定。
- ? 鎖的超時(shí)設(shè)置
在使用鎖時(shí),應(yīng)該避免出現(xiàn)無(wú)限等待的情況??梢允褂脦С瑫r(shí)的鎖,當(dāng)?shù)却龝r(shí)間超過(guò)指定的時(shí)間時(shí),會(huì)返回一個(gè)錯(cuò)誤。這將防止出現(xiàn)死鎖或其他問(wèn)題。
// 引入第三方庫(kù)處理超時(shí)
// parking_lot = "0.12.1"
use parking_lot::RwLock;
use std::sync::Arc;
use std::thread;
use std::time::{Duration, Instant};
fn main() {
let rwlock = Arc::new(RwLock::new(0));
let start = Instant::now();
// 嘗試在 1 秒內(nèi)獲取讀鎖
let reader = loop {
if let Some(r) = rwlock.try_read_for(Duration::from_secs(1)) {
break r;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire read lock within 5 seconds.");
}
};
// 嘗試在 1 秒內(nèi)獲取寫(xiě)鎖
let mut writer = loop {
if let Some(w) = rwlock.try_write_for(Duration::from_secs(1)) {
break w;
}
if start.elapsed() >= Duration::from_secs(5) {
panic!("Failed to acquire write lock within 5 seconds.");
}
};
// 進(jìn)行讀寫(xiě)操作
println!("Reader: {}", *reader);
*writer += 1;
println!("Writer: {}", *writer);
}
在上面的例子中,讀取器等待 100 毫秒后超時(shí),寫(xiě)入器等待 1 秒鐘才能成功完成寫(xiě)入。
總結(jié)
RwLock 是 Rust 中一種常用的線程同步機(jī)制,可以提高程序的并發(fā)性能。它只允許一個(gè)線程寫(xiě)入數(shù)據(jù),但可以讓多個(gè)線程同時(shí)讀取同一個(gè)數(shù)據(jù)。具體來(lái)說(shuō),RwLock 在實(shí)現(xiàn)上使用了兩個(gè) Mutex,一個(gè)用于保護(hù)讀取計(jì)數(shù)器,另一個(gè)用于保護(hù)寫(xiě)入狀態(tài)。在使用 RwLock 時(shí),應(yīng)該注意縮小鎖的范圍、避免使用過(guò)多讀寫(xiě)鎖以及防止死鎖等問(wèn)題。
-
編程語(yǔ)言
+關(guān)注
關(guān)注
10文章
1945瀏覽量
34740 -
程序
+關(guān)注
關(guān)注
117文章
3787瀏覽量
81049 -
內(nèi)存管理
+關(guān)注
關(guān)注
0文章
168瀏覽量
14141 -
rust語(yǔ)言
+關(guān)注
關(guān)注
0文章
57瀏覽量
3009
發(fā)布評(píng)論請(qǐng)先 登錄
相關(guān)推薦
評(píng)論