無畏并發
并發
- Concurrent:程式的不同部分之間獨立的執行(并發)
- Parallel:程式的不同部分同時運行(并行)
- Rust無畏并發:允許你撰寫沒有細微Bug的代碼,并在不引入新Bug的情況下易于重構
- 注意:本文中的”并發“泛指 concurrent 和 parallel
一、使用執行緒同時運行代碼(多執行緒)
行程與執行緒
- 在大部分OS里,代碼運行在行程(process)中,OS同時管理多個行程,
- 在你的程式里,各獨立部分可以同時運行,運行這些獨立部分的就是執行緒(thread)
- 多執行緒運行:
- 提升性能表現
- 增加復雜性:無法保障各執行緒的執行順序
多執行緒可導致的問題
- 競爭狀態,執行緒以不一致的順序訪問資料或資源
- 死鎖,兩個執行緒彼此等待對方使用完所持有的資源,執行緒無法繼續
- 只在某些情況下發生的 Bug,很難可靠地復制現象和修復
實作執行緒的方式
- 通過呼叫OS的API來創建執行緒:1:1模型
- 需要較小的運行時
- 語言自己實作的執行緒(綠色執行緒):M:N模型
- 需要更大的運行時
- Rust:需要權衡運行時的支持
- Rust標準庫僅提供1:1模型的執行緒
通過 spawn 創建新執行緒
- 通過 thread::spawn 函式可以創建新執行緒:
- 引數:一個閉包(在新執行緒里運行的代碼)
? cd rust
~/rust
? cargo new thread_demo
Created binary (application) `thread_demo` package
~/rust
? cd thread_demo
thread_demo on master [?] via ?? 1.67.1
? c # code .
thread_demo on master [?] via ?? 1.67.1
?
- thread::sleep 會導致當前執行緒暫停執行
use std::thread;
use std::time::Duration;
fn main() {
thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
}
執行
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.65s
Running `target/debug/thread_demo`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the main thread!
hi number 2 from the spawned thread!
hi number 3 from the main thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
?
通過 join Handle 來等待所有執行緒的完成
- thread::spawn 函式的回傳值型別是 JoinHandle
- JoinHandle 持有值的所有權
- 呼叫其 join 方法,可以等待對應的其它執行緒的完成
- join 方法:呼叫 handle 的join方法會阻止當前運行執行緒的執行,直到 handle 所表示的這些執行緒終結,
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
handle.join().unwrap();
}
執行
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.75s
Running `target/debug/thread_demo`
hi number 1 from the main thread!
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 2 from the main thread!
hi number 3 from the spawned thread!
hi number 3 from the main thread!
hi number 4 from the spawned thread!
hi number 4 from the main thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
等分執行緒執行完繼續執行主執行緒
use std::thread;
use std::time::Duration;
fn main() {
let handle = thread::spawn(|| {
for i in 1..10 {
println!("hi number {} from the spawned thread!", i);
thread::sleep(Duration::from_millis(1));
}
});
handle.join().unwrap();
for i in 1..5 {
println!("hi number {} from the main thread!", i);
thread::sleep(Duration::from_millis(1)); // 暫停 1 毫秒
}
}
運行
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
? cargo run
Compiling thread_demo v0.1.0 (/Users/qiaopengjun/rust/thread_demo)
Finished dev [unoptimized + debuginfo] target(s) in 0.28s
Running `target/debug/thread_demo`
hi number 1 from the spawned thread!
hi number 2 from the spawned thread!
hi number 3 from the spawned thread!
hi number 4 from the spawned thread!
hi number 5 from the spawned thread!
hi number 6 from the spawned thread!
hi number 7 from the spawned thread!
hi number 8 from the spawned thread!
hi number 9 from the spawned thread!
hi number 1 from the main thread!
hi number 2 from the main thread!
hi number 3 from the main thread!
hi number 4 from the main thread!
thread_demo on master [?] is ?? 0.1.0 via ?? 1.67.1
使用 move 閉包
- move 閉包通常和 thread::spawn 函式一起使用,它允許你使用其它執行緒的資料
- 創建執行緒時,把值的所有權從一個執行緒轉移到另一個執行緒
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(|| { // 報錯
println!("Here's a vector: {:?}", v);
});
// drop(v);
handle.join().unwrap();
}
修改后:
use std::thread;
fn main() {
let v = vec![1, 2, 3];
let handle = thread::spawn(move || {
println!("Here's a vector: {:?}", v);
});
// drop(v);
handle.join().unwrap();
}
二、使用訊息傳遞來跨執行緒傳遞資料
訊息傳遞
- 一種很流行且能保證安全并發的技術就是:訊息傳遞,
- 執行緒(或 Actor)通過彼此發送訊息(資料)來進行通信
- Go 語言的名言:不要用共享記憶體來通信,要用通信來共享記憶體,
- Rust:Channel(標準庫提供)
Channel
- Channel 包含: 發送端、接收端
- 呼叫發送端的方法,發送資料
- 接收端會檢查和接收到達的資料
- 如果發送端、接收端中任意一端被丟棄了,那么Channel 就”關閉“了
創建 Channel
- 使用
mpsc::channel
函式來創建 Channel- mpsc 表示 multiple producer,single consumer(多個生產者、一個消費者)
- 回傳一個 tuple(元組):里面元素分別是發送端、接收端
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
發送端的 send 方法
- 引數:想要發送的資料
- 回傳:Result<T, E>
- 如果有問題(例如接收端已經被丟棄),就回傳一個錯誤
接收端的方法
- recv 方法:阻止當前執行緒執行,直到 Channel 中有值被送來
- 一旦有值收到,就回傳 Result<T, E>
- 當發送端關閉,就會收到一個錯誤
- try_recv 方法:不會阻塞,
- 立即回傳 Result<T, E>:
- 有資料達到:回傳 Ok,里面包含著資料
- 否則,回傳錯誤
- 通常會使用回圈呼叫來檢查 try_recv 的結果
- 立即回傳 Result<T, E>:
Channel 和所有權轉移
- 所有權在訊息傳遞中非常重要:能幫你撰寫安全、并發的代碼
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let val = String::from("hi");
tx.send(val).unwrap();
println!("val is {}", val) // 報錯 借用了移動的值
});
let received = rx.recv().unwrap();
println!("Got: {}", received);
}
發送多個值,看到接收者在等待
use std::sync::mpsc;
use std::thread;
fn main() {
let (tx, rx) = mpsc::channel();
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
通過克隆創建多個發送者
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
fn main() {
let (tx, rx) = mpsc::channel();
let tx1 = mpsc::Sender::clone(&tx);
thread::spawn(move || {
let vals = vec![
String::from("1: hi"),
String::from("1: from"),
String::from("1: the"),
String::from("1: thread"),
];
for val in vals {
tx1.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
thread::spawn(move || {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("thread"),
];
for val in vals {
tx.send(val).unwrap();
thread::sleep(Duration::from_millis(1));
}
});
for received in rx {
println!("Got: {}", received);
}
}
三、共享狀態的并發
使用共享來實作并發
- Go 語言的名言:不要用共享記憶體來通信,要用通信來共享記憶體,
- Rust支持通過共享狀態來實作并發,
- Channel 類似單所有權:一旦將值的所有權轉移至 Channel,就無法使用它了
- 共享記憶體并發類似多所有權:多個執行緒可以同時訪問同一塊記憶體
使用 Mutex 來每次只允許一個執行緒來訪問資料
- Mutex 是 mutual exclusion(互斥鎖)的簡寫
- 在同一時刻,Mutex 只允許一個執行緒來訪問某些資料
- 想要訪問資料:
- 執行緒必須首先獲取互斥鎖(lock)
- lock 資料結構是 mutex 的一部分,它能跟蹤誰對資料擁有獨占訪問權
- mutex 通常被描述為:通過鎖定系統來保護它所持有的資料
- 執行緒必須首先獲取互斥鎖(lock)
Mutex 的兩條規則
- 在使用資料之前,必須嘗試獲取鎖(lock),
- 使用完 mutex 所保護的資料,必須對資料進行解鎖,以便其它執行緒可以獲取鎖,
Mutex<T>
的 API
- 通過 Mutex::new(資料) 來創建
Mutex<T>
Mutex<T>
是一個智能指標
- 訪問資料前,通過 lock 方法來獲取鎖
- 會阻塞當前執行緒
- lock 可能會失敗
- 回傳的是 MutexGuard(智能指標,實作了 Deref 和 Drop)
use std::sync::Mutex;
fn main() {
let m = Mutex::new(5);
{
let mut num = m.lock().unwrap();
*num = 6;
}
println!("m = {:?}", m);
}
多執行緒共享 Mutex<T>
use std::sync::Mutex;
use std::thread;
fn main() {
let counter = Mutex::new(0);
let mut handles = vec![];
for _ in 0..10 {
let handle = thread::spawn(move || { // 報錯 回圈 所有權
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
多執行緒的多重所有權
use std::sync::Mutex;
use std::thread;
use std::rc::Rc;
fn main() {
let counter = Rc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Rc::clone(&counter);
let handle = thread::spawn(move || { // 報錯 rc 只能用于單執行緒
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
使用 Arc<T>
來進行原子參考計數
Arc<T>
和Rc<T>
類似,它可以用于并發情景- A:atomic,原子的
- 為什么所有的基礎型別都不是原子的,為什么標準庫型別不默認使用
Arc<T>
?- 需要性能作為代價
Arc<T>
和Rc<T>
的API是相同的
use std::sync::{Mutex, Arc};
use std::thread;
fn main() {
let counter = Arc::new(Mutex::new(0));
let mut handles = vec![];
for _ in 0..10 {
let counter = Arc::clone(&counter);
let handle = thread::spawn(move || {
let mut num = counter.lock().unwrap();
*num += 1;
});
handles.push(handle);
}
for handle in handles {
handle.join().unwrap();
}
println!("Result: {}", *counter.lock().unwrap());
}
RefCell<T>
/Rc<T>
vs Muter<T>
/Arc<T>
Mutex<T>
提供了內部可變性,和 Cell 家族一樣- 我們使用
RefCell<T>
來改變Rc<T>
里面的內容 - 我們使用
Mutex<T>
來改變Arc<T>
里面的內容 - 注意:
Mutex<T>
有死鎖風險
四、通過 Send 和 Sync Trait 來擴展并發
Send 和 Sync trait
- Rust 語言的并發特性較少,目前講的并發特性都來自標準庫(而不是語言本身)
- 無需局限于標準庫的并發,可以自己實作并發
- 但在Rust語言中有兩個并發概念:
- std::marker::Sync 和 std::marker::Send 這兩個trait
Send:允許執行緒間轉移所有權
- 實作 Send trait 的型別可在執行緒間轉移所有權
- Rust中幾乎所有的型別都實作了 Send
- 但
Rc<T>
沒有實作 Send,它只用于單執行緒情景
- 但
- 任何完全由Send 型別組成的型別也被標記為 Send
- 除了原始指標之外,幾乎所有的基礎型別都是 Send
Sync:允許從多執行緒訪問
- 實作Sync的型別可以安全的被多個執行緒參考
- 也就是說:如果T是Sync,那么 &T 就是 Send
- 參考可以被安全的送往另一個執行緒
- 基礎型別都是 Sync
- 完全由 Sync 型別組成的型別也是 Sync
- 但,
Rc<T>
不是 Sync 的 RefCell<T>
和Cell<T>
家族也不是 Sync的- 而,
Mutex<T>
是Sync的
- 但,
手動來實作 Send 和 Sync 是不安全的
本文來自博客園,作者:QIAOPENGJUN,轉載請注明原文鏈接:https://www.cnblogs.com/QiaoPengjun/p/17332132.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/550524.html
標籤:其他
上一篇:推排序 Verilog實作原理