Rust async 編程
Asynchronous Programming in Rust:https://rust-lang.github.io/async-book/
中文書名《Rust 異步編程指南》:https://github.com/rustlang-cn/async-book
Rust語言圣經(Rust Course):https://course.rs/advance/async/getting-started.html
一、Getting Started
1.1 為什么使用 async
為什么使用 async
- Async 編程,是一種并發(concurrent)編程模型
- 允許你在少數系統執行緒上運行大量的并發任務
- 通過 async/await 語法,看起來和同步編程差不多
其它的并發模型
- OS 執行緒
- 無需改變任何編程模型,執行緒間同步困難,性能開銷大
- 執行緒池可以降低一些成本,但難以支撐大量 IO 系結的作業
- Event-driven 編程
- 與回呼函式一起用,可能高效
- 非線性的控制流,資料流和錯誤傳播難以追蹤
- 協程(Coroutines)
- 類似執行緒,無需改變編程模型
- 類似
async
,支持大量任務 - 抽象掉了底層細節(這對系統編程、自定義運行時的實作很重要)
- Actor 模型
- 將所有并發計算劃分為
actor
, 訊息通信易出錯 - 可以有效的實作 actor 模型,但許多實際問題沒解決(例如流程控制、重試邏輯)
- 將所有并發計算劃分為
Rust 中的 async
- Future 是惰性的
- 只有
poll
時才能取得進展, 被丟棄的future
就無法取得進展了
- 只有
- Async是零成本的
- 使用
async
,可以無需堆記憶體分配(heap allocation)和動態調度(dynamic dispatch),對性能大好,且允許在受限環境使用 async
- 使用
- 不提供內置運行時
- 運行時由Rust 社區提供,例如
tokio
- 運行時由Rust 社區提供,例如
- 單執行緒、多執行緒均支持
- 這兩者擁有各自的優缺點
Rust 中的 async 和執行緒(thread)
- OS 執行緒:
- 適用于少量任務,有記憶體和CPU開銷,且執行緒生成和執行緒間切換非常昂貴
- 執行緒池可以降低一些成本
- 允許重用同步代碼,代碼無需大改,無需特定編程模型
- 有些系統支持修改執行緒優先級
- Async:
- 顯著降低記憶體和CPU開銷
- 同等條件下,支持比執行緒多幾個數量級的任務(少數執行緒支撐大量任務)
- 可執行檔案大(需要生成狀態機,每個可執行檔案捆綁一個異步運行時)
Async 并不是比執行緒好,只是不同而已!
總結:
- 有大量
IO
任務需要并發運行時,選async
模型 - 有部分
IO
任務需要并發運行時,選多執行緒,如果想要降低執行緒創建和銷毀的開銷,可以使用執行緒池 - 有大量
CPU
密集任務需要并行運行時,例如并行計算,選多執行緒模型,且讓執行緒數等于或者稍大于CPU
核心數 - 無所謂時,統一選多執行緒
例子
如果想并發的下載檔案,你可以使用多執行緒如下實作:
fn get_two_sites() {
// Spawn two threads to do work. 創建兩個新執行緒執行任務
let thread_one = thread::spawn(|| download("https://www.foo.com"));
let thread_two = thread::spawn(|| download("https://www.bar.com"));
// Wait for both threads to complete. 等待兩個執行緒的完成
thread_one.join().expect("thread one panicked");
thread_two.join().expect("thread two panicked");
}
使用async
的方式:
async fn get_two_sites_async() {
// Create two different "futures" which, when run to completion, 創建兩個不同的`future`,你可以把`future`理解為未來某個時刻會被執行的計劃任務
// will asynchronously download the webpages. 當兩個`future`被同時執行后,它們將并發的去下載目標頁面
let future_one = download_async("https://www.foo.com");
let future_two = download_async("https://www.bar.com");
// Run both futures to completion at the same time. 同時運行兩個`future`,直至完成
join!(future_one, future_two);
}
自定義并發模型
- 除了執行緒和async,還可以用其它的并發模型(例如 event-driven)
1.2 Rust Async 的目前狀態
Async Rust 目前的狀態
- 部分穩定,部分仍在變化,
- 特點:
- 針對典型并發任務,性能出色
- 與高級語言特性頻繁互動(生命周期、pinning)
- 同步和異步代碼間、不同運行時的異步代碼間存在兼容性約束
- 由于不斷進化,維護負擔更重
語言和庫的支持
- 雖然Rust本身就支持Async編程,但很多應用依賴與社區的庫:
- 標準庫提供了最基本的特性、型別和功能,例如 Future trait
- async/await 語法直接被Rust編譯器支持
- futures crate 提供了許多實用型別、宏和函式,它們可以用于任何異步應用程式,
- 異步代碼、IO 和任務生成的執行由 "async runtimes" 提供,例如 Tokio 和 async-std,大多數async 應用程式和一些 async crate 都依賴于特定的運行時,
注意
- Rust 不允許你在 trait 里宣告 async 函式
編譯和除錯
- 編譯錯誤:
- 由于
async
通常依賴于更復雜的語言功能,例如生命周期和Pinning
,因此可能會更頻繁地遇到這些型別的錯誤,
- 由于
- 運行時錯誤:
- 每當運行時遇到異步函式,編譯器會在后臺生成一個狀態機,Stack traces 里有其明細,以及運行時呼叫的函式,因此解釋起來更復雜,
- 新的失效模式:
- 可能出現一些新的故障,它們可以通過編譯,甚至單元測驗,
兼容性考慮
-
async和同步代碼不能總是自由組合
- 例如,不能直接從同步函式來呼叫
async
異步函式
- 例如,不能直接從同步函式來呼叫
-
Async 代碼間也不總是能自由組合
- 一些crate依賴于特定的
async
運行時
- 一些crate依賴于特定的
-
因此,盡早研究確定使用哪個 async 運行時
性能特征
async
的性能依賴于運行時的表現(通常較出色)
1.3 async/await 入門
async
async
把一段代碼轉化為一個實作了Future trait
的狀態機- 雖然在同步方法中呼叫阻塞函式會阻塞整個執行緒,但阻塞的Future將放棄對執行緒的控制,從而允許其它
Future
來運行,
~/rust via ?? base
? cargo new async_demo
Created binary (application) `async_demo` package
~/rust via ?? base
? cd async_demo
async_demo on master [?] via ?? 1.67.1 via ?? base
? c
async_demo on master [?] via ?? 1.67.1 via ?? base
?
代碼
#![allow(unused)]
fn main() {
async fn do_something() {/* ... */}
}
Cargo .toml
[package]
name = "async_demo"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
futures = "0.3.28"
async fn
- 異步函式語法:
async fn do_something() {/* ... */}
async fn
回傳的是 Future,Future 需要由一個執行者來運行
futures::executor::block_on;
- block_on 阻塞當前執行緒,直到提供的 Future 運行完成
- 其它執行者提供更復雜的行為,例如將多個 Future 安排到同一個執行緒上
use futures::executor::block_on;
async fn hello_world() {
println!("Hello world!");
}
fn main() {
let future = hello_world(); // 什么都沒列印出來
block_on(future); // `future` 運行,并列印出 "Hello world!"
}
Await
- 在 async fn 中,可以使用 .await 來等待另一個實作 Future trait 的完成
- 與 block_on 不同,
.await
不會阻塞當前執行緒,而是異步的等待 Future 的完成(如果該 Future 目前無法取得進展,就允許其他任務執行)
use futures::executor::block_on;
struct Song {}
async fn learn_song() -> Song {
Song {}
}
async fn sing_song(song: Song) { /* ... */
}
async fn dance() { /* ... */
}
fn main() {
let song = block_on(learn_song());
block_on(sing_song(song));
block_on(dance());
}
修改之后
use futures::executor::block_on;
struct Song {}
async fn learn_song() -> Song {
Song {}
}
async fn sing_song(song: Song) {}
async fn dance() {}
async fn learn_and_sing() {
let song = learn_song().await;
sing_song(song).await;
}
async fn async_main() {
let f1 = learn_and_sing();
let f2 = dance();
futures::join!(f1, f2);
}
fn main() {
block_on(async_main());
}
二、幕后原理:執行 Future 和任務
2.1 Future trait
Future trait
Future
trait是 Rust Async異步編程的核心Future
是一種異步計算,它可以產生一個值- 實作了 Future 的型別表示目前可能還不可用的值
下面是一個簡化版的 Future trait:
trait SimpleFuture {
type Output;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}
enum Poll<T> {
Ready(T),
Pending,
}
-
Future 可以表示:
-
下一次網路資料包的到來
-
下一次滑鼠的移動
-
或者僅僅是經過一段時間的時間點
-
-
Future
代表著一種你可以檢驗其是否完成的操作 -
Future
可以通過呼叫 poll 函式來取得進展- poll 函式會驅動 Future 盡可能接近完成
- 如果
Future
完成了:就回傳poll::Ready(result)
,其中 result 就是最終的結果 - 如果
Future
還無法完成:就回傳poll::Pending
,并當 Future 準備好取得更多進展時呼叫一個waker
的wake() 函式
-
針對
Future
,你唯一能做的就是使用 poll 來敲它,直到一個值掉出來
wake() 函式
- 當 wake() 函式被呼叫時:
- 執行器將驅動 Future 再次呼叫 poll 函式,以便 Future 能取得更多的進展
- 沒有wake() 函式,執行器就不知道特定的 Future 何時能取得進展(就得不斷的 poll)
- 通過 wake() 函式,執行器就確切的知道哪些 Future 已準備好進行 poll() 的呼叫
例子
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it. socket有資料,寫入buffer中并回傳
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data. socket中還沒資料
//
// Arrange for `wake` to be called once data is available. 注冊一個`wake`函式,當資料可用時,該函式會被呼叫,
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data. 然后當前Future的執行器會再次呼叫`poll`方法,此時就可以讀取到資料
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
例子
- 組合多個異步操作,而無需中間分配
- 可以通過無分配的狀態機來實作多個 Future 同時運行或串聯運行
/// A SimpleFuture that runs two other futures to completion concurrently. 它會并發地運行兩個Future直到它們完成
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace. 之所以可以并發,是因為兩個Future的輪詢可以交替進行,一個阻塞,另一個就可以立刻執行,反之亦然
pub struct Join<FutureA, FutureB> {
// Each field may contain a future that should be run to completion. 結構體的每個欄位都包含一個Future,可以運行直到完成.
// If the future has already completed, the field is set to `None`.
// This prevents us from polling a future after it has completed, which
// would violate the contract of the `Future` trait. 等到Future完成后,欄位會被設定為 `None`. 這樣Future完成后,就不會再被輪詢
a: Option<FutureA>,
b: Option<FutureB>,
}
impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
// Attempt to complete future `a`. 嘗試去完成一個 Future `a`
if let Some(a) = &mut self.a {
if let Poll::Ready(()) = a.poll(wake) {
self.a.take();
}
}
// Attempt to complete future `b`. 嘗試去完成一個 Future `b`
if let Some(b) = &mut self.b {
if let Poll::Ready(()) = b.poll(wake) {
self.b.take();
}
}
if self.a.is_none() && self.b.is_none() {
// Both futures have completed -- we can return successfully 兩個 Future都已完成 - 我們可以成功地回傳了
Poll::Ready(())
} else {
// One or both futures returned `Poll::Pending` and still have 至少還有一個 Future 沒有完成任務,因此回傳 `Poll::Pending`.
// work to do. They will call `wake()` when progress can be made. 當該 Future 再次準備好時,通過呼叫`wake()`函式來繼續執行
Poll::Pending
}
}
}
例子
多個連續的 Future 可以一個接一個的運行
/// A SimpleFuture that runs two futures to completion, one after another. 一個SimpleFuture, 它使用順序的方式,一個接一個地運行兩個Future
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both 注意: 由于本例子用于演示,因此功能簡單,`AndThenFut` 會假設兩個 Future 在創建時就可用了.
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`. 而真實的`Andthen`允許根據第一個`Future`的輸出來創建第二個`Future`,因此復雜的多,
pub struct AndThenFut<FutureA, FutureB> {
first: Option<FutureA>,
second: FutureB,
}
impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
FutureA: SimpleFuture<Output = ()>,
FutureB: SimpleFuture<Output = ()>,
{
type Output = ();
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if let Some(first) = &mut self.first {
match first.poll(wake) {
// We've completed the first future -- remove it and start on
// the second! 我們已經完成了第一個 Future, 可以將它移除, 然后準備開始運行第二個
Poll::Ready(()) => self.first.take(),
// We couldn't yet complete the first future. 第一個 Future 還不能完成
Poll::Pending => return Poll::Pending,
};
}
// Now that the first future is done, attempt to complete the second. 運行到這里,說明第一個Future已經完成,嘗試去完成第二個
self.second.poll(wake)
}
}
真正的 Future trait
trait Future {
type Output;
fn poll(
// Note the change from `&mut self` to `Pin<&mut Self>`: 首先值得注意的地方是,`self`的型別從`&mut self`變成了`Pin<&mut Self>`:
self: Pin<&mut Self>,
// and the change from `wake: fn()` to `cx: &mut Context<'_>`: 其次將`wake: fn()` 修改為 `cx: &mut Context<'_>`:
cx: &mut Context<'_>,
) -> Poll<Self::Output>;
}
-
self 型別不再是
&mut self
,而是pin<&mut self>
- 它允許我們創建不可移動的 future
-
不可移動的物件可以在它們的欄位間存盤指標
-
需要啟用
async/await
,Pin
就是必須的 -
wake: fn()
變成了&mut Context<'_>
, -
在 SimpleFuture 里:
- 我們通過呼叫函式指標 (fn()) 來告訴 Future 的執行器:相關的 Future 應該被 poll 了,
- 由于 fn() 是一個函式指標,它不能存盤任何關于哪個 Future呼叫了 wake 的資料
-
Context 型別提供了訪問 Waker型別的值的方式,這些值可以被用來 wake up 特定的任務
- 例如,實際專案中Web Server可能有上千個不同的連接,它們的wakeup 應單獨管理
總之,在正式場景要進行 wake
,就必須攜帶上資料, 而 Context
型別通過提供一個 Waker
型別的值,就可以用來喚醒特定的的任務,
2.2 使用 Waker 喚醒任務
Waker 型別的作用
- Future在第一次poll的時候通常無法完成任務,所以Future需要保證在準備好取得更多進展后,可以再次被 poll
- 每次 Future 被 poll,它都是作為一個任務的一部分
- 任務(Task)就是被提交給執行者的頂層的 Future
Waker 型別
- Waker 提供了 wake() 方法,它可以被用來告訴執行者:相關的任務應該被喚醒
- 當 wake() 被呼叫,執行者知道 Waker 所關聯的任務已經準備好取得更多進展,Future應該被再次 poll
- Waker 實作了 clone(),可以復制和存盤
例子
- 使用 Waker 實作一個簡單的計時器 Future
- 構建一個定時器
~/rust via ?? base
? cargo new timer_future
Created binary (application) `timer_future` package
~/rust via ?? base
? cd timer_future
timer_future on master [?] via ?? 1.67.1 via ?? base
? c
lib.rs 檔案
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
/// 在Future和等待的執行緒間共享狀態
struct SharedState {
/// Whether or not the sleep time has elapsed
/// 定時(睡眠)是否結束
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
/// 當睡眠結束后,執行緒可以用`waker`通知`TimerFuture`來喚醒任務
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
// Look at the shared state to see if the timer has already completed.
// 通過檢查共享狀態,來確定定時器是否已經完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
// 設定`waker`,這樣新執行緒在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,
//
// 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理,
// 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,
// 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務
shared_state.waker = Some(cx.waker().clone());
Poll::Pending
}
}
}
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
/// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成
pub fn new(duration: Duration) -> Self {
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread 創建新執行緒
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
thread::sleep(duration); // 睡眠指定時間實作計時功能
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
// 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
waker.wake()
}
});
TimerFuture { shared_state }
}
}
2.3 構建一個執行器(Executor)
Future 執行者(Executor)
- Future 是惰性的:除非驅動它們來完成,否則就什么都不做
- 一種驅動方式是在 async 函式里使用 .await,但這只是把問題推到了上一層面
- 誰來執行頂層 async 函式回傳的 Future?
- 需要的是一個 Future 執行者
- Future 執行者會獲取一系列頂層的 Future,通過在 Future 可以有進展的時候呼叫 poll,來將這些 Future 運行至完成
- 通常執行者首先會對 Future進行 poll 一次,以便開始
- 當 Future 通過呼叫 wake() 表示它們已經準備好取得進展時,它們就會被放回到一個佇列里,然后 poll 再次被呼叫,重復此操作直到 Future 完成
例子
- 構建簡單的執行者,可以運行大量的頂層 Future 來并發的完成
- 需要使用 future crate 的 ArcWake trait:
- 它提供了簡單的方式用來組建 Waker
lib.rs 檔案
use std::{
future::Future,
pin::Pin,
sync::{Arc, Mutex},
task::{Context, Poll, Waker},
thread,
time::Duration,
};
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
}
/// Shared state between the future and the waiting thread
/// 在Future和等待的執行緒間共享狀態
struct SharedState {
/// Whether or not the sleep time has elapsed
/// 定時(睡眠)是否結束
completed: bool,
/// The waker for the task that `TimerFuture` is running on.
/// The thread can use this after setting `completed = true` to tell
/// `TimerFuture`'s task to wake up, see that `completed = true`, and
/// move forward.
/// 當睡眠結束后,執行緒可以用`waker`通知`TimerFuture`來喚醒任務
waker: Option<Waker>,
}
impl Future for TimerFuture {
type Output = ();
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
println!("[{:?}] Polling TimerFuture...", thread::current().id());
// Look at the shared state to see if the timer has already completed.
// 通過檢查共享狀態,來確定定時器是否已經完成
let mut shared_state = self.shared_state.lock().unwrap();
if shared_state.completed {
println!("[{:?}] TimerFuture completed...", thread::current().id());
Poll::Ready(())
} else {
// Set waker so that the thread can wake up the current task
// when the timer has completed, ensuring that the future is polled
// again and sees that `completed = true`.
//
// It's tempting to do this once rather than repeatedly cloning
// the waker each time. However, the `TimerFuture` can move between
// tasks on the executor, which could cause a stale waker pointing
// to the wrong task, preventing `TimerFuture` from waking up
// correctly.
//
// N.B. it's possible to check for this using the `Waker::will_wake`
// function, but we omit that here to keep things simple.
// 設定`waker`,這樣新執行緒在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,
//
// 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理,
// 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,
// 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務
shared_state.waker = Some(cx.waker().clone());
println!("[{:?}] TimerFuture pending...", thread::current().id());
Poll::Pending
}
}
}
impl TimerFuture {
/// Create a new `TimerFuture` which will complete after the provided
/// timeout.
/// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成
pub fn new(duration: Duration) -> Self {
println!("[{:?}] 開始創建新的 TimerFuture...", thread::current().id());
let shared_state = Arc::new(Mutex::new(SharedState {
completed: false,
waker: None,
}));
// Spawn the new thread 創建新執行緒
let thread_shared_state = shared_state.clone();
thread::spawn(move || {
println!(
"[{:?}] TimerFuture 生成新的執行緒并開始睡眠...",
thread::current().id()
);
thread::sleep(duration); // 睡眠指定時間實作計時功能
let mut shared_state = thread_shared_state.lock().unwrap();
// Signal that the timer has completed and wake up the last
// task on which the future was polled, if one exists.
// 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了
shared_state.completed = true;
if let Some(waker) = shared_state.waker.take() {
println!(
"[{:?}] TimerFuture 新執行緒獲得 waker,并進行 wake()...",
thread::current().id()
);
waker.wake()
} else {
println!(
"[{:?}] TimerFuture 新執行緒沒獲得 waker...",
thread::current().id()
);
}
});
println!("[{:?}] 回傳新的 TimerFuture...", thread::current().id());
TimerFuture { shared_state }
}
}
main.rs 檔案
use futures::{
future::{BoxFuture, FutureExt},
task::{waker_ref, ArcWake},
};
use std::{
thread,
future::Future,
sync::mpsc::{sync_channel, Receiver, SyncSender},
sync::{Arc, Mutex},
task::Context,
time::Duration,
};
// The timer we wrote in the previous section: 引入之前實作的定時器模塊
use timer_future::TimerFuture;
/// Task executor that receives tasks off of a channel and runs them.
/// 任務執行器,負責從通道中接收任務然后執行
struct Executor {
ready_queue: Receiver<Arc<Task>>,
}
/// `Spawner` spawns new futures onto the task channel.
/// `Spawner`負責創建新的`Future`然后將它發送到任務通道中
#[derive(Clone)]
struct Spawner {
task_sender: SyncSender<Arc<Task>>,
}
/// A future that can reschedule itself to be polled by an `Executor`.
/// 一個Future,它可以調度自己(將自己放入任務通道中),然后等待執行器去`poll`
struct Task {
/// In-progress future that should be pushed to completion. 進行中的Future,在未來的某個時間點會被完成
///
/// The `Mutex` is not necessary for correctness, since we only have 按理來說`Mutex`在這里是多余的,因為我們只有一個執行緒來執行任務,但是由于
/// one thread executing tasks at once. However, Rust isn't smart
/// enough to know that `future` is only mutated from one thread, Rust并不聰明,它無法知道`Future`只會在一個執行緒內被修改,并不會被跨執行緒修改,因此
/// so we need to use the `Mutex` to prove thread-safety. A production 我們需要使用`Mutex`來滿足這個笨笨的編譯器對執行緒安全的執著,
/// executor would not need this, and could use `UnsafeCell` instead.
/// 如果是生產級的執行器實作,不會使用`Mutex`,因為會帶來性能上的開銷,取而代之的是使用`UnsafeCell`
future: Mutex<Option<BoxFuture<'static, ()>>>,
/// Handle to place the task itself back onto the task queue.
/// 可以將該任務自身放回到任務通道中,等待執行器的poll
task_sender: SyncSender<Arc<Task>>,
}
fn new_executor_and_spawner() -> (Executor, Spawner) {
// Maximum number of tasks to allow queueing in the channel at once. 任務通道允許的最大緩沖數(任務佇列的最大長度)
// This is just to make `sync_channel` happy, and wouldn't be present in
// a real executor. 當前的實作僅僅是為了簡單,在實際的執行中,并不會這么使用
const MAX_QUEUED_TASKS: usize = 10_000;
let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
println!("[{:?}] 生成 Executor 和 Spawner (含發送端、接收端)...", thread::current().id());
(Executor { ready_queue }, Spawner { task_sender })
}
impl Spawner {
// 把 Future 包裝成任務發送到通道
fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
let future = future.boxed();
let task = Arc::new(Task {
future: Mutex::new(Some(future)),
task_sender: self.task_sender.clone(),
});
println!("[{:?}] 將 Future 組成 Task,放入 Channel...", thread::current().id());
self.task_sender.send(task).expect("too many tasks queued");
}
}
impl ArcWake for Task {
fn wake_by_ref(arc_self: &Arc<Self>) {
// Implement `wake` by sending this task back onto the task channel
// so that it will be polled again by the executor.
// 通過發送任務到任務管道的方式來實作`wake`,這樣`wake`后,任務就能被執行器`poll`
println!("[{:?}] wake_by_ref...", thread::current().id());
let cloned = arc_self.clone();
arc_self
.task_sender
.send(cloned)
.expect("too many tasks queued");
}
}
impl Executor {
fn run(&self) {
println!("[{:?}] Executor running...", thread::current().id());
while let Ok(task) = self.ready_queue.recv() { // 從通道不斷接收任務
println!("[{:?}] 接收到任務...", thread::current().id());
// Take the future, and if it has not yet completed (is still Some),
// poll it in an attempt to complete it. 獲取一個future,若它還沒有完成(仍然是Some,不是None),則對它進行一次poll并嘗試完成它
let mut future_slot = task.future.lock().unwrap();
if let Some(mut future) = future_slot.take() {
println!("[{:?}] 從任務中取得 Future...", thread::current().id());
// Create a `LocalWaker` from the task itself 基于任務自身創建一個 `LocalWaker`
let waker = waker_ref(&task);
println!("[{:?}] 獲得 waker by ref...", thread::current().id());
let context = &mut Context::from_waker(&waker);
// `BoxFuture<T>` is a type alias for #`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的型別別名
// `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
// We can get a `Pin<&mut dyn Future + Send + 'static>`
// from it by calling the `Pin::as_mut` method.
// 通過呼叫`as_mut`方法,可以將上面的型別轉換成`Pin<&mut dyn Future + Send + 'static>`
println!("[{:?}] 獲得 context,準備進行 poll()...", thread::current().id());
if future.as_mut().poll(context).is_pending() {
// We're not done processing the future, so put it
// back in its task to be run again in the future. Future還沒執行完,因此將它放回任務中,等待下次被poll
*future_slot = Some(future);
println!("[{:?}] Poll::Pending ====", thread::current().id());
} else {
println!("[{:?}] Poll::Ready....", thread::current().id());
}
}
}
println!("[{:?}] Excutor run 結束", thread::current().id());
}
}
fn main() {
// 回傳一個執行者和一個任務的生成器
let (executor, spawner) = new_executor_and_spawner();
// Spawn a task to print before and after waiting on a timer. 生成一個任務 async塊是一個Future
spawner.spawn(async {
println!("[{:?}] howdy!", thread::current().id());
// Wait for our timer future to complete after two seconds. 創建定時器Future,并等待它完成
TimerFuture::new(Duration::new(2, 0)).await;
println!("[{:?}] done!", thread::current().id());
});
// Drop the spawner so that our executor knows it is finished and won't
// receive more incoming tasks to run. drop掉任務,這樣執行器就知道任務已經完成,不會再有新的任務進來
println!("[{:?}] drop Spawner!", thread::current().id());
drop(spawner);
// Run the executor until the task queue is empty. 運行執行器直到任務佇列為空
// This will print "howdy!", pause, and then print "done!". 任務運行后,會先列印`howdy!`, 暫停2秒,接著列印 `done!`
executor.run();
}
運行
timer_future on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 2.9s
? cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.06s
Running `target/debug/timer_future`
[ThreadId(1)] 生成 Executor 和 Spawner (含發送端、接收端)...
[ThreadId(1)] 將 Future 組成 Task,放入 Channel...
[ThreadId(1)] drop Spawner!
[ThreadId(1)] Executor running...
[ThreadId(1)] 接收到任務...
[ThreadId(1)] 從任務中取得 Future...
[ThreadId(1)] 獲得 waker by ref...
[ThreadId(1)] 獲得 context,準備進行 poll()...
[ThreadId(1)] howdy!
[ThreadId(1)] 開始創建新的 TimerFuture...
[ThreadId(1)] 回傳新的 TimerFuture...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture pending...
[ThreadId(1)] Poll::Pending ====
[ThreadId(2)] TimerFuture 生成新的執行緒并開始睡眠...
[ThreadId(2)] TimerFuture 新執行緒獲得 waker,并進行 wake()...
[ThreadId(2)] wake_by_ref...
[ThreadId(1)] 接收到任務...
[ThreadId(1)] 從任務中取得 Future...
[ThreadId(1)] 獲得 waker by ref...
[ThreadId(1)] 獲得 context,準備進行 poll()...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture completed...
[ThreadId(1)] done!
[ThreadId(1)] Poll::Ready....
[ThreadId(1)] Excutor run 結束
timer_future on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 2.6s
?
2.4 執行器和系統IO
Future
在 Socket
上執行異步讀取
- 這個 Future 會讀取 socket 上可用的資料
- 如果沒有資料,它就屈服于執行者:請求當 socket 再次可讀時,喚醒它的任務
- 本例中我們不知道 Socket 型別是如何實作的,尤其不知道 set_readable_callback 函式如何作業,那么如何在 socket 再次可讀時安排呼叫 wake() 呢?
- 一種辦法是使用一個執行緒不斷檢查 socket 是否可讀(低效!)
pub struct SocketRead<'a> {
socket: &'a Socket,
}
impl SimpleFuture for SocketRead<'_> {
type Output = Vec<u8>;
fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
if self.socket.has_data_to_read() {
// The socket has data -- read it into a buffer and return it.
// socket有資料,寫入buffer中并回傳
Poll::Ready(self.socket.read_buf())
} else {
// The socket does not yet have data. socket中還沒資料
//
// Arrange for `wake` to be called once data is available.
// When data becomes available, `wake` will be called, and the
// user of this `Future` will know to call `poll` again and
// receive data.
// 注冊一個`wake`函式,當資料可用時,該函式會被呼叫,
// 然后當前Future的執行器會再次呼叫`poll`方法,此時就可以讀取到資料
self.socket.set_readable_callback(wake);
Poll::Pending
}
}
}
- 實際中,這個問題通過與IO感知的系統阻塞原語(primitive)的集成來解決
- 例如
Linux
中的epoll
,FreeBSD
和macOS
中的kqueue
,Windows
中的IOCP
,Fuchisa
中的ports
等 - 所有這些都是通過 Rust 跨平臺包的
mio crate
來暴露的
- 例如
- 這些原語(primitive)都允許執行緒阻塞多個異步 IO 事件,并在其中一個事件完成后回傳,
struct IoBlocker {
/* ... */
}
struct Event {
// An ID uniquely identifying the event that occurred and was listened for.
// Event的唯一ID,該事件發生后,就會被監聽起來
id: usize,
// A set of signals to wait for, or which occurred.
// 一組需要等待或者已發生的信號
signals: Signals,
}
impl IoBlocker {
/// Create a new collection of asynchronous IO events to block on.
/// 創建需要阻塞等待的異步IO事件的集合
fn new() -> Self { /* ... */ }
/// Express an interest in a particular IO event.
/// 對指定的IO事件表示興趣
fn add_io_event_interest(
&self,
/// The object on which the event will occur
/// 事件所系結的socket
io_object: &IoObject,
/// A set of signals that may appear on the `io_object` for
/// which an event should be triggered, paired with
/// an ID to give to events that result from this interest.
event: Event,
) { /* ... */ }
/// Block until one of the events occurs.
/// 進入阻塞,直到某個事件出現
fn block(&self) -> Event { /* ... */ }
}
let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
&socket_1,
Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
&socket_2,
Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();
// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
// 當socket的資料可以讀取時,列印 "Socket 1 is now READABLE"
println!("Socket {:?} is now {:?}", event.id, event.signals);
Socket::set_readable_callback 的偽代碼
- Future 執行者可以使用這些原語提供異步 IO 物件,例如 socket,它就可以當特定 IO 事件發生時通過配置回呼來運行
- 針對我們 SocketRead 例子,Socket::set_readable_callback 的偽代碼大致如下:
impl Socket {
fn set_readable_callback(&self, waker: Waker) {
// `local_executor` is a reference to the local executor.
// this could be provided at creation of the socket, but in practice
// many executor implementations pass it down through thread local
// storage for convenience.
let local_executor = self.local_executor;
// Unique ID for this IO object.
let id = self.id;
// Store the local waker in the executor's map so that it can be called
// once the IO event arrives.
local_executor.event_map.insert(id, waker);
local_executor.add_io_event_interest(
&self.socket_file_descriptor,
Event { id, signals: READABLE },
);
}
}
現在,我們就只有一個執行者執行緒,它可以接收 IO事件,并將它們分配到適合的 Waker,這將喚醒相應的任務,并允許執行者在回傳檢查更多的IO事件之前,驅動更多的任務完成(回圈繼續...),
這樣,我們只需要一個執行器執行緒,它會接收IO事件并將其分發到對應的 Waker
中,接著后者會喚醒相關的任務,最終通過執行器 poll
后,任務可以順利的繼續執行, 這種IO讀取流程可以不停的回圈,直到 socket
關閉,
三、async & .await
什么是 async/.await
- async/.await 是 Rust 的特殊語法,在發生阻塞時,它讓放棄當前執行緒的控制權成為可能,這就允許在等待操作完成的時候,允許其它代碼取得進展
使用 async 的兩種方式
有兩種方式可以使用async
: async fn
用于宣告函式,async { ... }
用于宣告陳述句塊,它們會回傳一個實作 Future
特征的值:
- async 和 async blocks:
- 都回傳實作了 Future trait 的值
- async 體和其它 future都是惰性的:
- 在真正運行之前什么都不做
- 使用
.await
是最常見的運行future 的方式:- 對 future 使用
.await
就會嘗試驅動Future運行至完成
- 對 future 使用
- 如果 Future 被阻塞:
- 它會放棄當前執行緒的控制權
- 當可取得更多進展時,執行器會撿起這個 Future 并恢復執行,最終由
.await
完成決議
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
// `foo()`回傳一個`Future<Output = u8>`,
// 當呼叫`foo().await`時,該`Future`將被運行,當呼叫結束后我們將獲取到一個`u8`值
async fn foo() -> u8 { 5 }
fn bar() -> impl Future<Output = u8> {
// This `async` block results in a type that implements
// `Future<Output = u8>`.
// 下面的`async`陳述句塊回傳`Future<Output = u8>`
async {
let x: u8 = foo().await;
x + 5
}
}
例子
use async_std::io::prelude::*;
use async_std::net;
use async_std::task;
// 異步函式以 async fn 開始 里面由3個異步函式 .await
// 無需調整 async fn 的回傳型別,Rust 自動把它當成相應的 Future 型別
// 回傳的 Future 包含所需相關資訊:引數、本地變數空間...
// Future 的具體型別由編譯器基于函式體和引數自動生成
// 該型別沒有名稱
// 它實作了 Future<Output=R>
// 第一次對 cheapo_request 進行 poll 時:
// 從函式體頂部開始執行
// 直到第一個 await(針對 TcpStream::connect 回傳的 Future)
// 隨著 cheapo_request 的 Future 不斷被 poll,其執行就是從一個 await 到下一個 await,而且只有子 Future變成 Ready 之后才繼續
// cheapo_reauest 的 Future 會追蹤:
// 下一次 poll 應恢復繼續的那個店
// 以及所需的本地狀態(變數、引數、臨時變數等)
// 這種途中能暫停執行,然后恢復執行的能力是 async 所獨有的
// 由于 await 運算式依賴于“可恢復執行”這個特性,所以 await 只能用在 async 里
// 暫停執行時執行緒在做什么?
// 它不是在干等,而是在做其它作業,
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
// .await 會等待,直到 future 變成 ready, await 最侄訓決議出 future 的值
// connect 當呼叫 async 函式時,在其函式體執行前,它就會立即回傳
// 這個 await 運算式會對 connect 的Future進行 poll:
// 如果沒完成 -> 回傳 Pending
// 針對 cheapo_request 的 poll 也無法繼續,
// 直到 connect 的 Future 回傳 Ready
let mut socket = net::TcpStream::connect((host, port)).await?;
let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
socket.write_all(request.as_bytes()).await?;
socket.shutdown(net::Shutdown::Write)?;
let mut response = String::new();
socket.read_to_string(&mut response).await?;
Ok(response)
}
fn main() -> std::io::Result<()> {
// 注意:
// 下一次對 cheapo_request 的 Future 進行 poll 時:
// 并不在函式體頂部開始執行
// 它會在 connect Future 進行 poll 的地方繼續執行
// 直到它變成 Ready,才會繼續在函式體往下走
let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
println!("{}", response);
Ok(())
}
- await:
- 獲得 Future 的所有權,并對其進行 poll
- 如果 Future Ready,其最終值就是 await 運算式的值,這時執行就可以繼續了
- 否則就回傳 Pending 給呼叫者
async 的生命周期
- 與傳統函式不同:async fn,如果它的引數是參考或是其它非 'static 的,那么它回傳的 Future 就會系結到引數的生命周期上,
- 這意味著 async fn 回傳的 future,在 .await 的同時,fn 的非 'static 的引數必須保持有效
// This function:
async fn foo(x: &u8) -> u8 { *x }
// Is equivalent to this function:
// 上面的函式跟下面的函式是等價的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
async move { *x }
}
存盤 future 或傳遞 future
- 通常,async 的函式在呼叫后會立即 .await,這就不是問題:
- 例如:
foo(&x).await
- 例如:
- 如果存盤 future 或將其傳遞給其它任務或執行緒,就有問題了...
- 一種變通解決辦法:
- 思路:把使用參考作為引數的 async fn 轉為一個 'static future
- 做法:在 async 塊里,將引數和 async fn 的 呼叫捆綁到一起(延長引數的生命周期來匹配 future)
fn bad() -> impl Future<Output = u8> {
let x = 5;
borrow_x(&x) // ERROR: `x` does not live long enough
}
// 將引數和對 async fn 的呼叫放在同一個 async 陳述句塊
fn good() -> impl Future<Output = u8> {
async {
let x = 5;
borrow_x(&x).await
}
}
以上代碼會報錯,因為 x
的生命周期只到 bad
函式的結尾, 但是 Future
顯然會活得更久
通過將引數移動到 async
陳述句塊內, 將它的生命周期擴展到 'static
, 并跟回傳的 Future
保持了一致,
async move
- async 塊和閉包都支持 move
- async move 塊會獲得其參考變數的所有權:
- 允許其比當前所在的作用域活得長
- 但同時也放棄了與其它代碼共享這些變數的能力
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
// 多個不同的 `async` 陳述句塊可以訪問同一個本地變數,只要它們在該變數的作用域內執行
async fn blocks() {
let my_string = "foo".to_string();
let future_one = async {
// ...
println!("{my_string}");
};
let future_two = async {
// ...
println!("{my_string}");
};
// Run both futures to completion, printing "foo" twice:
// 運行兩個 Future 直到完成
let ((), ()) = futures::join!(future_one, future_two);
}
/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
// 由于`async move`會捕獲環境中的變數,因此只有一個`async move`陳述句塊可以訪問該變數,
// 但是它也有非常明顯的好處: 變數可以轉移到回傳的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
let my_string = "foo".to_string();
async move {
// ...
println!("{my_string}");
}
}
在多執行緒執行者上進行 .await
- 當使用多執行緒 future 執行者時,future 就可以在執行緒間移動:
- 所以 async 體里面用的變數必須能夠在執行緒間移動
- 因為任何的 .await 都可能導致切換到一個新執行緒
- 這意味著使用以下型別時不安全的:
- Rc、&RefCell 和任何其它沒有實作 Send trait 的型別,包括沒實作 Sync trait 的參考
- 注意:呼叫 .await 時,只要這些型別不在作用域內,就可以使用它們,
- 在跨域一個 .await 期間,持有傳統的、對 future 無感知的鎖,也不是好主意:
- 可導致執行緒池鎖定
- 為此,可使用 futures::lock 里的 Mutex 而不是 std::sync 里的
四、Pinning
什么是 Pin
- Pin 與 Unpin 標記一起作業
- Pin 會保證實作了 !Unpin 的物件永遠不會被移動
為什么需要 Pin?
let fut_one = /* ... */; // Future 1
let fut_two = /* ... */; // Future 2
async move {
fut_one.await;
fut_two.await;
}
- 這會創建一個實作了 Future trait 的匿名型別
- 提供一個和下面代碼類似的 poll 方法
// The `Future` type generated by our `async { ... }` block
// `async { ... }`陳述句塊創建的 `Future` 型別
struct AsyncFuture {
fut_one: FutOne,
fut_two: FutTwo,
state: State,
}
// List of states our `async` block can be in
// `async` 陳述句塊可能處于的狀態
enum State {
AwaitingFutOne,
AwaitingFutTwo,
Done,
}
impl Future for AsyncFuture {
type Output = ();
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
loop {
match self.state {
State::AwaitingFutOne => match self.fut_one.poll(..) {
Poll::Ready(()) => self.state = State::AwaitingFutTwo,
Poll::Pending => return Poll::Pending,
}
State::AwaitingFutTwo => match self.fut_two.poll(..) {
Poll::Ready(()) => self.state = State::Done,
Poll::Pending => return Poll::Pending,
}
State::Done => return Poll::Ready(()),
}
}
}
}
當 poll
第一次被呼叫時,它會去查詢 fut_one
的狀態,若 fut_one
無法完成,則 poll
方法會回傳,未來對 poll
的呼叫將從上一次呼叫結束的地方開始,該程序會一直持續,直到 Future
完成為止,
如果上例中 async 塊使用參考,會如何?
async {
let mut x = [0; 128];
let read_into_buf_fut = read_into_buf(&mut x);
read_into_buf_fut.await;
println!("{:?}", x);
}
這段代碼會編譯成下面的形式:
struct ReadIntoBuf<'a> {
buf: &'a mut [u8], // 指向下面的`x`欄位
}
struct AsyncFuture {
x: [u8; 128],
read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}
這里,ReadIntoBuf
擁有一個參考欄位,指向了結構體的另一個欄位 x
,一旦 AsyncFuture
被移動,那 x
的地址也將隨之變化,此時對 x
的參考就變成了不合法的,
- 把 Future Pin(釘)到記憶體中的特定位置會防止該問題的發生:
- 可以在 async 塊里安全的創建到值的參考
Pin 介紹
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
}
}
fn init(&mut self) {
let self_ref: *const String = &self.a;
self.b = self_ref;
}
fn a(&self) -> &str {
&self.a
}
fn b(&self) -> &String {
assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
unsafe { &*(self.b) }
}
}
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b());
println!("a: {}, b: {}", test2.a(), test2.b());
}
運行輸出
a: test1, b: test1
a: test2, b: test2
修改之后
fn main() {
let mut test1 = Test::new("test1");
test1.init();
let mut test2 = Test::new("test2");
test2.init();
println!("a: {}, b: {}", test1.a(), test1.b());
std::mem::swap(&mut test1, &mut test2); // 交換 test1 和 test2 移動資料
println!("a: {}, b: {}", test2.a(), test2.b());
}
輸出
a: test1, b: test1
a: test1, b: test2
Pin 的實踐
- Pin 型別會包裹指標型別,保證指標指向的值不被移動,
- 例如:Pin<&mut T>,Pin<&T>, Pin<Box
> - 即使 T:!Unpin,也不能保證 T 不被移動
Unpin trait
- 大多數型別如果被移動,不會造成問題,它們實作了 Unpin
- 指向 Unpin 型別的指標,可自由的放入或從 Pin 中取出
- 例如:u8 是 Unpin 的,Pin<&mut u8> 和普通的 &mut u8 一樣
- 如果型別擁有 !Unpin 標記,那么在 Pin 之后它們就無法移動了
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Self {
Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned, // 這個標記可以讓我們的型別自動實作特征`!Unpin`
}
}
fn init(self: Pin<&mut Self>) {
let self_ptr: *const String = &self.a;
let this = unsafe { self.get_unchecked_mut() };
this.b = self_ptr;
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
unsafe { &*(self.b) }
}
}
上面代碼中,我們使用了一個標記型別 PhantomPinned
將自定義結構體 Test
變成了 !Unpin
(編譯器會自動幫我們實作),因此該結構體無法再被移動,
一旦型別實作了 !Unpin
,那將它的值固定到堆疊( stack
)上就是不安全的行為,因此在代碼中我們使用了 unsafe
陳述句塊來進行處理,你也可以使用 pin_utils
來避免 unsafe
的使用,
pub fn main() {
// test1 is safe to move before we initialize it
// 此時的`test1`可以被安全的移動
let mut test1 = Test::new("test1");
// Notice how we shadow `test1` to prevent it from being accessed again
// 新的`test1`由于使用了`Pin`,因此無法再被移動,這里的宣告會將之前的`test1`遮蔽掉(shadow)
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
嘗試進行交換
pub fn main() {
let mut test1 = Test::new("test1");
let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
Test::init(test1.as_mut());
let mut test2 = Test::new("test2");
let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
Test::init(test2.as_mut());
println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
std::mem::swap(test1.get_mut(), test2.get_mut()); // 報錯
println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}
Pinning to the Heap 固定到堆上
use std::pin::Pin;
use std::marker::PhantomPinned;
#[derive(Debug)]
struct Test {
a: String,
b: *const String,
_marker: PhantomPinned,
}
impl Test {
fn new(txt: &str) -> Pin<Box<Self>> {
let t = Test {
a: String::from(txt),
b: std::ptr::null(),
_marker: PhantomPinned,
};
let mut boxed = Box::pin(t);
let self_ptr: *const String = &boxed.a;
unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };
boxed
}
fn a(self: Pin<&Self>) -> &str {
&self.get_ref().a
}
fn b(self: Pin<&Self>) -> &String {
unsafe { &*(self.b) }
}
}
pub fn main() {
let test1 = Test::new("test1");
let test2 = Test::new("test2");
println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}
將一個 !Unpin
型別的值固定到堆上,會給予該值一個穩定的記憶體地址,它指向的堆中的值在 Pin
后是無法被移動的,而且與固定在堆疊上不同,我們知道堆上的值在整個生命周期內都會被穩穩地固定住,
總結
-
如果 T:Unpin (默認情況),那么 Pin<'a, T> 與 &'a mut T 完全等價
- Unpin 意味著該型別如果被 Pin 了,那么它也是可以移動的,所以 Pin 對這種型別不起作用
-
如果 T:!Unpin,那么把 &mut T 變成 Pin 的 T,需要 unsafe 操作
-
大多數標準庫型別都實作了 Unpin,Rust 里大部分正常型別也是,由 async/await 生成的 Future 是個例外
-
可以使用特性標記為型別添加一個 !Unpin 系結(最新版),或者通過添加
std::marker::PhantomPinned
到型別上(穩定版) -
可以將資料 Pin 到 Stack 或 Heap 上
-
把 !Unpin 物件 Pin 到 Stack 上需要 unsafe 操作
-
把 !Unpin duix Pin 到 Heap 上不需要 unsafe 操作
- 快捷操作:使用 Box::pin
-
針對已經 Pin 的資料,如果它是 T: !Unpin 的,則需要保證它從被 Pin 后,記憶體一直有效且不會調整其用途,直到 drop 被呼叫,
- 這是 Pin 合約的重要部分
-
若
T: Unpin
( Rust 型別的默認實作),那么Pin<'a, T>
跟&'a mut T
完全相同,也就是Pin
將沒有任何效果, 該移動還是照常移動 -
絕大多數標準庫型別都實作了
Unpin
,事實上,對于 Rust 中你能遇到的絕大多數型別,該結論依然成立 ,其中一個例外就是:async/await
生成的Future
沒有實作Unpin
-
你可以通過以下方法為自己的型別添加
!Unpin
約束:- 使用文中提到的
std::marker::PhantomPinned
- 使用
nightly
版本下的feature flag
- 使用文中提到的
-
可以將值固定到堆疊上,也可以固定到堆上
- 將
!Unpin
值固定到堆疊上需要使用unsafe
- 將
!Unpin
值固定到堆上無需unsafe
,可以通過Box::pin
來簡單的實作
- 將
-
當固定型別
T: !Unpin
時,你需要保證資料從被固定到被drop這段時期內,其記憶體不會變得非法或者被重用
五、Streams
Stream trait
- Stream trait 和 Future trait 類似,但它可以在完成前產生多個值,這點和標準庫 Iterator trait 也很像
trait Stream {
/// The type of the value yielded by the stream.
// Stream生成的值的型別
type Item;
/// Attempt to resolve the next item in the stream.
/// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
/// is ready, and `Poll::Ready(None)` if the stream has completed.
// 嘗試去決議Stream中的下一個值,
// 若無資料,回傳`Poll::Pending`, 若有資料,回傳 `Poll::Ready(Some(x))`, `Stream`完成則回傳 `Poll::Ready(None)`
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
-> Poll<Option<Self::Item>>;
}
關于 Stream
的一個常見例子是訊息通道( futures
包中的)的消費者 Receiver
,每次有訊息從 Send
端發送后,它都可以接收到一個 Some(val)
值, 一旦 Send
端關閉(drop),且訊息通道中沒有訊息后,它會接收到一個 None
值,
async fn send_recv() {
const BUFFER_SIZE: usize = 10;
let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);
tx.send(1).await.unwrap();
tx.send(2).await.unwrap();
drop(tx);
// `StreamExt::next` is similar to `Iterator::next`, but returns a
// type that implements `Future<Output = Option<T>>`.
// `StreamExt::next` 類似于 `Iterator::next`, 但是前者回傳的不是值,而是一個 `Future<Output = Option<T>>`,
// 因此還需要使用`.await`來獲取具體的值
assert_eq!(Some(1), rx.next().await);
assert_eq!(Some(2), rx.next().await);
assert_eq!(None, rx.next().await);
}
Iteration and Concurrency 迭代與并發
- 與同步的 Iterator 類似,有很多方法迭代和處理 Stream 中的值:
- 組合器風格的:map、filter、fold
- 相應的 "early-exit-on-error" 版本:try_map、try_filter、try_fold
- for 回圈無法和 Stream 一起使用
- 命令式的 while let 和 next/try_next 函式可以與 Stream 一起使用
例子
async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
use futures::stream::StreamExt; // for `next`
let mut sum = 0;
while let Some(item) = stream.next().await {
sum += item;
}
sum
}
async fn sum_with_try_next(
mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
use futures::stream::TryStreamExt; // for `try_next`
let mut sum = 0;
while let Some(item) = stream.try_next().await? {
sum += item;
}
Ok(sum)
}
上面代碼是一次處理一個值的模式,但是需要注意的是:如果你選擇一次處理一個值的模式,可能會造成無法并發,這就失去了異步編程的意義, 因此,如果可以的話我們還是要選擇從一個 Stream
并發處理多個值的方式,通過 for_each_concurrent
或 try_for_each_concurrent
方法來實作:
async fn jump_around(
mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
const MAX_CONCURRENT_JUMPERS: usize = 100;
stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
jump_n_times(num).await?;
report_n_jumps(num).await?;
Ok(())
}).await?;
Ok(())
}
六、Executing Multiple Futures at a Time(同時執行多個 Future)
本節內容
- 真正的異步應用程式通常需要同時執行幾個不同的操作
- 介紹一些可同時執行多個異步操作的方式:
- Join!,等待所有 Future 完成
- Select!,等待多個 future 中的一個完成
- Spawning,創建一個頂級任務,他會運行一個 future 直至完成
- FutureUnordered,一組 Future,它們會產生每個子 Future 的結果
6.1 join!
join!
- futures::join 宏,它使得在等待多個 future完成的時候,可以同時并發的執行它們,
例子
async fn get_book_and_music() -> (Book, Music) {
let book = get_book().await;
let music = get_music().await;
(book, music)
}
要支持同時看書和聽歌,有些人可能會生成下面代碼:
// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
let book_future = get_book();
let music_future = get_music();
(book_future.await, music_future.await)
}
為了正確的并發運行兩個 Future
, 我們來試試 futures::join!
宏:
use futures::join;
async fn get_book_and_music() -> (Book, Music) {
let book_fut = get_book();
let music_fut = get_music();
join!(book_fut, music_fut)
}
try_join!
- 對于回傳 Result 的 future,更考慮使用 try_join!
- 如果子 future 中某一個回傳了錯誤,try_join! 會立即完成
例子
use futures::try_join;
async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book();
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
有一點需要注意,傳給 try_join!
的所有 Future
都必須擁有相同的錯誤型別,如果錯誤型別不同,可以考慮使用來自 futures::future::TryFutureExt
模塊的 map_err
和err_info
方法將錯誤進行轉換:
use futures::{
future::TryFutureExt,
try_join,
};
async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }
async fn get_book_and_music() -> Result<(Book, Music), String> {
let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
let music_fut = get_music();
try_join!(book_fut, music_fut)
}
6.2 select!
select!
- futures::select 宏可同時運行多個 future,允許用戶在任意一個 future 完成時進行回應
use futures::{
future::FutureExt, // for `.fuse()`
pin_mut,
select,
};
async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }
async fn race_tasks() {
let t1 = task_one().fuse();
let t2 = task_two().fuse();
pin_mut!(t1, t2);
select! {
() = t1 => println!("task one completed first"),
() = t2 => println!("task two completed first"),
}
}
default => ... 和 complete => ...
- select 支持 default 和 complete 分支
- default:如果選中的 future 尚未完成,就會運行 default 分支
- 擁有 default 的 select 總是會立即回傳
- complete:它用于所有選中的 future 都已完成的情況,
complete
分支當所有的Future
和Stream
完成后才會被執行,它往往配合loop
使用,loop
用于回圈完成所有的Future
default
分支,若沒有任何Future
或Stream
處于Ready
狀態, 則該分支會被立即執行
例子
use futures::{future, select};
async fn count() {
let mut a_fut = future::ready(4);
let mut b_fut = future::ready(6);
let mut total = 0;
loop {
select! {
a = a_fut => total += a,
b = b_fut => total += b,
complete => break,
default => unreachable!(), // never runs (futures are ready, then complete) 該分支永遠不會運行,因為`Future`會先運行,然后是`complete`
};
}
assert_eq!(total, 10);
}
與 Unpin 和 FusedFuture 互動
- 前面的例子中,需要在回傳的 future 上呼叫
.fuse()
,也呼叫了 pin_mut,- 因為 select 里面的 future 必須使用 Unpin 和 FusedFuture 這兩個 trait,
- 必須 Unpin:select 使用的 future 不是按值的,而是按可變參考,
- 未完成的 future 在呼叫 select 后 仍可使用
- 必須 FusedFuture:在 future 完成后,select 不可以對它進行 poll
- 實作 FusedFuture 的 future 會追蹤其完成狀態,這樣在 select 回圈里,就只會 poll 沒有完成的 future
Stream 也有 FusedStream trait
首先,.fuse()
方法可以讓 Future
實作 FusedFuture
特征, 而 pin_mut!
宏會為 Future
實作 Unpin
特征,這兩個特征恰恰是使用 select
所必須的:
Unpin
,由于select
不會通過拿走所有權的方式使用Future
,而是通過可變參考的方式去使用,這樣當select
結束后,該Future
若沒有被完成,它的所有權還可以繼續被其它代碼使用,FusedFuture
的原因跟上面類似,當Future
一旦完成后,那select
就不能再對其進行輪詢使用,Fuse
意味著熔斷,相當于Future
一旦完成,再次呼叫poll
會直接回傳Poll::Pending
,
只有實作了FusedFuture
,select
才能配合 loop
一起使用,假如沒有實作,就算一個 Future
已經完成了,它依然會被 select
不停的輪詢執行,
Stream
稍有不同,它們使用的特征是 FusedStream
, 通過.fuse()
(也可以手動實作)實作了該特征的 Stream
,對其呼叫.next()
或 .try_next()
方法可以獲取實作了FusedFuture
特征的Future
:
use futures::{
stream::{Stream, StreamExt, FusedStream},
select,
};
async fn add_two_streams(
mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
let mut total = 0;
loop {
let item = select! {
x = s1.next() => x,
x = s2.next() => x,
complete => break,
};
if let Some(next_num) = item {
total += next_num;
}
}
total
}
select 回圈里使用 Fuse 和 FuturesUnordered
- Fuse::terminated(),允許構建空的、已完成的 future,后續可以為它填充一個需要運行的 future
- 適用于在 select 回圈里產生且需要在這運行的任務,這種場景
- 當同個 future 的多個副本需同時運行時,使用 FuturesUnordered 型別
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) { /* ... */ }
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
let get_new_num_fut = Fuse::terminated();
pin_mut!(run_on_new_num_fut, get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
// 定時器已結束,若`get_new_num_fut`沒有在運行,就創建一個新的
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`,
// dropping the old one.
// 收到新的數字 -- 創建一個新的`run_on_new_num_fut`并丟棄掉舊的
run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
},
// Run the `run_on_new_num_fut` // 運行 `run_on_new_num_fut`
() = run_on_new_num_fut => {},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
// 若所有任務都完成,直接 `panic`, 原因是 `interval_timer` 應該連續不斷的產生值,而不是結束 //后,執行到 `complete` 分支
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
當某個 Future
有多個拷貝都需要同時運行時,可以使用 FuturesUnordered
型別,下面的例子跟上個例子大體相似,但是它會將 run_on_new_num_fut
的每一個拷貝都運行到完成,而不是像之前那樣一旦創建新的就終止舊的,
use futures::{
future::{Fuse, FusedFuture, FutureExt},
stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
pin_mut,
select,
};
async fn get_new_num() -> u8 { /* ... */ 5 }
async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }
// 使用從 `get_new_num` 獲取的最新數字 來運行 `run_on_new_num`
//
// 每當計時器結束后,`get_new_num` 就會運行一次,它會立即取消當前正在運行的`run_on_new_num` ,
// 并且使用新回傳的值來替換
async fn run_loop(
mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
starting_num: u8,
) {
let mut run_on_new_num_futs = FuturesUnordered::new();
run_on_new_num_futs.push(run_on_new_num(starting_num));
let get_new_num_fut = Fuse::terminated();
pin_mut!(get_new_num_fut);
loop {
select! {
() = interval_timer.select_next_some() => {
// The timer has elapsed. Start a new `get_new_num_fut`
// if one was not already running.
// 定時器已結束,若`get_new_num_fut`沒有在運行,就創建一個新的
if get_new_num_fut.is_terminated() {
get_new_num_fut.set(get_new_num().fuse());
}
},
new_num = get_new_num_fut => {
// A new number has arrived -- start a new `run_on_new_num_fut`.
// 收到新的數字 -- 創建一個新的`run_on_new_num_fut` (并沒有像之前的例子那樣丟棄掉舊值)
run_on_new_num_futs.push(run_on_new_num(new_num));
},
// Run the `run_on_new_num_futs` and check if any have completed
// 運行 `run_on_new_num_futs`, 并檢查是否有已經完成的
res = run_on_new_num_futs.select_next_some() => {
println!("run_on_new_num_fut returned {:?}", res);
},
// panic if everything completed, since the `interval_timer` should
// keep yielding values indefinitely.
// 若所有任務都完成,直接 `panic`, 原因是 `interval_timer` 應該連續不斷的產生值,而不是結束
//后,執行到 `complete` 分支
complete => panic!("`interval_timer` completed unexpectedly"),
}
}
}
七、Workarounds to Know and Love 一些疑難問題的解決辦法
7.1 async 塊中的 ?
- Async 塊中使用 ? 是比較常見的
- 但是 async 塊的回傳型別沒有明確說明,這可能會導致編譯器無法推斷 async 塊的錯誤型別
- 目前沒法給 future 一個型別,也無法指明其型別
- 臨時解決辦法:使用 “turbofish”運算子,為 async 塊提供成功和錯誤型別
#![allow(unused)]
fn main() {
struct MyError
async fn foo() -> Result<(), MyError> {
Ok(())
}
async fn bar() -> Result<(), MyError> {
Ok(())
}
let fut = async {
foo().await?;
bar().await?;
Ok(()) // 報錯 cannot infer type for type parameter `E` declared on the enum `Result`
};
}
可以使用 ::< ... >
的方式來增加型別注釋:
#![allow(unused)]
fn main() {
struct MyError
async fn foo() -> Result<(), MyError> {
Ok(())
}
async fn bar() -> Result<(), MyError> {
Ok(())
}
let fut = async {
foo().await?;
bar().await?;
Ok::<(), MyError>(()) // <- note the explicit type annotation here
};
}
7.2 Send
Approximation
- 有些 async fn 狀態機可安全的跨執行緒發送,有些則不行
- async future 是否是 Send 的,取決于在 .await 點是否持有非 Send 的型別
- 當值可能在跨域 .await 點被持有時,編譯器會盡力近似估算,但這種估算在很多地方顯得過于保守
- 臨時辦法:引入塊作用域,把 non-Send 變數隔離
Rc
無法在多執行緒環境使用,原因就在于它并未實作 Send
特征
use std::rc::Rc;
#[derive(Default)]
struct NotSend(Rc<()>);
async fn bar() {}
async fn foo() {
NotSend::default();
bar().await;
}
fn require_send(_: impl Send) {}
fn main() {
require_send(foo());
}
即使上面的 foo
回傳的 Future
是 Send
, 但是在它內部短暫的使用 NotSend
依然是安全的,原因在于它的作用域并沒有影響到 .await
,下面來試試宣告一個變數,然后讓 .await
的呼叫處于變數的作用域中試試:
async fn foo() {
let x = NotSend::default();
bar().await;
}
報錯:future cannot be sent between threads safely
.await
在運行時處于 x
的作用域內,.await有可能被執行器調度到另一個執行緒上運行,而
Rc并沒有實作
Send,
可以將變數宣告在陳述句塊內,當陳述句塊結束時,變數會自動被 drop
async fn foo() {
{
let x = NotSend::default();
}
bar().await;
}
7.3 Recursion
- 在內部,async fn 會創建一個狀態機型別,它含有每個被 .awaited 的子 future
- 這就有點麻煩,因為狀態機需要包含其本身
- 臨時辦法:引入間接,使用Box,并把 recursive 放入非 async 的函式,它會回傳 .boxed() async 塊
// This function: foo函式:
async fn foo() {
step_one().await;
step_two().await;
}
// generates a type like this: 會被編譯成類似下面的型別:
enum Foo {
First(StepOne),
Second(StepTwo),
}
// So this function: recursive函式
async fn recursive() {
recursive().await;
recursive().await;
}
// generates a type like this: 會生成類似以下的型別
enum Recursive {
First(Recursive),
Second(Recursive),
}
這是典型的動態大小型別,它的大小會無限增長,因此編譯器會直接報錯:
error[E0733]: recursion in an `async fn` requires boxing
--> src/lib.rs:1:22
|
1 | async fn recursive() {
| ^ an `async fn` cannot invoke itself directly
|
= note: a recursive `async fn` must be rewritten to return a boxed future.
recursive
轉變成一個正常的函式,該函式回傳一個使用 Box
包裹的 async
陳述句塊:
use futures::future::{BoxFuture, FutureExt};
fn recursive() -> BoxFuture<'static, ()> {
async move {
recursive().await;
recursive().await;
}.boxed()
}
7.4 async in Trait
- 目前 async fn 不可用在 trait 里
- 臨時解決辦法:使用 async-trait
在目前版本中,我們還無法在特征中定義 async fn
函式,不過大家也不用擔心,目前已經有計劃在未來移除這個限制了,
trait Test {
async fn test();
}
運行后報錯:
error[E0706]: functions in traits cannot be declared `async`
--> src/main.rs:5:5
|
5 | async fn test();
| -----^^^^^^^^^^^
| |
| `async` because of this
|
= note: `async` trait functions are not currently supported
= note: consider using the `async-trait` crate: https://crates.io/crates/async-trait
好在編譯器給出了提示,讓我們使用 async-trait
解決這個問題:
use async_trait::async_trait;
#[async_trait]
trait Advertisement {
async fn run(&self);
}
struct Modal;
#[async_trait]
impl Advertisement for Modal {
async fn run(&self) {
self.render_fullscreen().await;
for _ in 0..4u16 {
remind_user_to_join_mailing_list().await;
}
self.hide_for_now().await;
}
}
struct AutoplayingVideo {
media_url: String,
}
#[async_trait]
impl Advertisement for AutoplayingVideo {
async fn run(&self) {
let stream = connect(&self.media_url).await;
stream.play().await;
// 用視頻說服用戶加入我們的郵件串列
Modal.run().await;
}
}
不過使用該包并不是免費的,每一次特征中的async
函式被呼叫時,都會產生一次堆記憶體分配,對于大多數場景,這個性能開銷都可以接受,但是當函式一秒呼叫幾十萬、幾百萬次時,就得小心這塊兒代碼的性能了!
八、The Async Ecosystem
Rust 沒提供什么
- Rust 目前只提供撰寫 async 代碼的基本要素,標準庫中尚未提供執行器、任務、反應器、組合器以及低級 I/O future 和 trait
- 社區提供的 async 生態系統填補了這些空白
Async 運行時
- Async 運行時是用于執行 async 應用程式的庫
- 運行時通常將一個反應器與一個或多個執行器捆綁在一起
- 反應器為異步I/O、行程間通信和計時器等外部事件提供訂閱機制
- 在 async 運行時中,訂閱者通常是代表低級別 I/O 操作的 future
- 執行者負責安排和執行任務
- 它們跟蹤正在運行和暫停的任務,對future進行 poll 直到完成,并在任務能夠取得進展時喚醒任務
- “執行者”一詞經常與“運行時”互換使用
- 我們使用“生態系統”一詞來描述一個與兼容 trait 和特性捆綁在一起的運行時
社區提供的 async crates
- futures crate ,提供了 Stream、Sink、AsyncRead、AsyncWrite 等 trait,以及組合器等工具,這些可能最侄訓成為標準庫的一部分
- Futures 有自己的執行器,但沒有自己的反應器,因此它不支持 async I/O 或計時器 future 的執行
- 因此,它不被認為是完整的運行時
- 常見的選擇是: 與另一個 crate 中的執行器一起使用來自 futures 提供的工具
流行的運行時
- Tokio:一個流行的 async 生態系統,包含 HTTP、gRPC 和跟蹤框架
- Async-std:提供標準庫的 async 副本
- Smol:小型、簡化的 async 運行時,提供可用于包裝 UnixStream 或 TcpListener 等結構的 async trait
- Fuchsia-async:用于 Fuchsia OS 的執行器
確定生態兼容性
- 與 async I/O、計時器、行程間通信或任務互動的 async 代碼通常取決于特定的異步執行器或反應器
- 所有其他 async 代碼,如異步運算式、組合器、同步型別和流,通常與生態系統無關,前提是任何嵌套的 future 也與生態系統無關
- 在開始一個專案之前,建議研究相關的 async 框架和庫,以確保與您選擇的運行時以及彼此之間的兼容性
單執行緒 VS 多執行緒執行器
- async 執行器可以是單執行緒或多執行緒的
- 多執行緒執行器可以在多個任務上同時取得進展,對于有許多任務的作業負載,它可以大大加快執行速度,但在任務之間同步資料通常更昂貴
- 在單執行緒運行時和多執行緒運行時之間進行選擇時,建議測量應用程式的性能
- 任務可以在創建它們的執行緒上運行,也可以在單獨的執行緒上運行
- 異步運行時通常提供將任務生成到單獨執行緒的功能
- 即使任務在不同的執行緒上執行,它們也應該是非阻塞的
- 為了在多執行緒執行器上調度任務,它們必須是 Send 的
- 一些運行時提供了生成非 Send 的任務的函式,確保每個任務都在生成它的執行緒上執行
- 它們還可以提供將阻塞任務生成到專用執行緒的函式,這對于運行來自其他庫的阻塞同步代碼非常有用
九、Final Project: Building a Concurrent Web Server with Async Rust
構建并發的(單執行緒的)Web Server
- 使用 async Rust 把 《The Rust Programming Language》(《Rust 權威指南》)一書中第 20 章的例子改為可并發處理請求的 Web Server
server on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base
? tree
.
├── 404.html
├── Cargo.lock
├── Cargo.toml
├── hello.html
├── src
│ └── main.rs
└── target
├── CACHEDIR.TAG
└── debug
├── build
├── deps
│ ├── libserver-4f1db2ad8f9f3102.rmeta
│ ├── libserver-d3461d12acfa742e.rmeta
│ ├── server-4f1db2ad8f9f3102.d
│ └── server-d3461d12acfa742e.d
├── examples
└── incremental
├── server-1ufb07a5oidqf
│ ├── s-glb12pwpr6-1ctgjrw-2k6dj6f8psanp
│ │ ├── dep-graph.bin
│ │ ├── query-cache.bin
│ │ └── work-products.bin
│ └── s-glb12pwpr6-1ctgjrw.lock
└── server-3vy5xgpwslzqx
├── s-glb12pwpr7-10sbywt-3rbnmvgc9rslx
│ ├── dep-graph.bin
│ ├── query-cache.bin
│ └── work-products.bin
└── s-glb12pwpr7-10sbywt.lock
12 directories, 18 files
server on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base
?
main.rs
use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;
fn main() {
// Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
let listener = TcpListener::bind("127.0.0.1:7878").unwrap();
// Block forever, handling each request that arrives at this IP address 阻塞等待請求的進入
for stream in listener.incoming() {
let stream = stream.unwrap();
handle_connection(stream);
}
}
fn handle_connection(mut stream: TcpStream) {
// Read the first 1024 bytes of data from the stream 從連接中順序讀取 1024 位元組資料
let mut buffer = [0; 1024];
stream.read(&mut buffer).unwrap();
let get = b"GET / HTTP/1.1\r\n";
// Respond with greetings or a 404,
// depending on the data in the request 處理HTTP協議頭,若不符合則回傳404和對應的`html`檔案
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
// Write response back to the stream, 將回復內容寫入連接快取中
// and flush the stream to ensure the response is sent back to the client 使用flush將快取中的內容發送到客戶端
let response = format!("{status_line}{contents}");
stream.write_all(response.as_bytes()).unwrap();
stream.flush().unwrap();
}
hello.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Hello!</h1>
<p>Hi from Rust</p>
</body>
</html>
404.html
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="utf-8">
<title>Hello!</title>
</head>
<body>
<h1>Oops!</h1>
<p>Sorry, I don't know what you're asking for.</p>
</body>
</html>
運行
server on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 5.0s
? cargo run
Finished dev [unoptimized + debuginfo] target(s) in 0.00s
Running `target/debug/server`


并發地處理連接
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;
#[async_std::main]
async fn main() {
// Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
handle_connection(tcpstream).await;
})
.await;
}
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
// async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
Cargo .toml
[package]
name = "server"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
futures = "0.3.28"
異步版本的 TcpListener
為 listener.incoming()
實作了 Stream
特征,以上修改有兩個好處:
listener.incoming()
不再阻塞- 使用
for_each_concurrent
并發地處理從Stream
獲取的元素
訪問:http://127.0.0.1:7878/ 和 http://127.0.0.1:7878/sleep
使用多執行緒并行處理請求
use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;
#[async_std::main]
async fn main() {
// Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
// handle_connection(tcpstream).await;
spawn(handle_connection(tcpstream));
})
.await;
}
async fn handle_connection(mut stream: TcpStream) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
// async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
Testing the TCP Server 測驗 handle_connection 函式
use async_std::io::{Read, Write};
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::marker::Unpin;
use std::time::Duration;
#[async_std::main]
async fn main() {
// Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();
listener
.incoming()
.for_each_concurrent(/* limit */ None, |tcpstream| async move {
let tcpstream = tcpstream.unwrap();
// handle_connection(tcpstream).await;
spawn(handle_connection(tcpstream));
})
.await;
}
async fn handle_connection(mut stream: impl Read + Write + Unpin) {
let mut buffer = [0; 1024];
stream.read(&mut buffer).await.unwrap();
let get = b"GET / HTTP/1.1\r\n";
let sleep = b"GET /sleep HTTP/1.1\r\n";
let (status_line, filename) = if buffer.starts_with(get) {
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else if buffer.starts_with(sleep) {
// async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
task::sleep(Duration::from_secs(5)).await;
("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
} else {
("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
};
let contents = fs::read_to_string(filename).unwrap();
let response = format!("{status_line}{contents}");
stream.write(response.as_bytes()).await.unwrap();
stream.flush().await.unwrap();
}
#[cfg(test)]
mod tests {
use super::*;
use futures::io::Error;
use futures::task::{Context, Poll};
use std::cmp::min;
use std::pin::Pin;
struct MockTcpStream {
read_data: Vec<u8>,
write_data: Vec<u8>,
}
impl Read for MockTcpStream {
fn poll_read(
self: Pin<&mut Self>,
_: &mut Context,
buf: &mut [u8],
) -> Poll<Result<usize, Error>> {
let size: usize = min(self.read_data.len(), buf.len());
buf[..size].copy_from_slice(&self.read_data[..size]);
Poll::Ready(Ok(size))
}
}
impl Write for MockTcpStream {
fn poll_write(
// poll_write 會拷貝輸入資料到mock的 TcpStream 中,當完成后回傳 Poll::Ready
mut self: Pin<&mut Self>,
_: &mut Context,
buf: &[u8],
) -> Poll<Result<usize, Error>> {
self.write_data = https://www.cnblogs.com/QiaoPengjun/archive/2023/05/26/Vec::from(buf);
Poll::Ready(Ok(buf.len()))
}
fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
Poll::Ready(Ok(()))
}
fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
Poll::Ready(Ok(()))
}
}
use std::marker::Unpin;
impl Unpin for MockTcpStream {}
use std::fs;
#[async_std::test]
async fn test_handle_connection() {
let input_bytes = b"GET / HTTP/1.1\r\n";
let mut contents = vec![0u8; 1024];
contents[..input_bytes.len()].clone_from_slice(input_bytes);
let mut stream = MockTcpStream {
read_data: contents,
write_data: Vec::new(),
};
handle_connection(&mut stream).await;
let mut buf = [0u8; 1024];
stream.read(&mut buf).await.unwrap();
let expected_contents = fs::read_to_string("hello.html").unwrap();
let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
assert!(stream.write_data.starts_with(expected_response.as_bytes()));
}
}
運行
server on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 53m 39.4s
? cargo test
Compiling server v0.1.0 (/Users/qiaopengjun/rust/server)
Finished test [unoptimized + debuginfo] target(s) in 0.64s
Running unittests src/main.rs (target/debug/deps/server-c69496e95f46c228)
running 1 test
test tests::test_handle_connection ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s
server on master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base
?
本文來自博客園,作者:QIAOPENGJUN,轉載請注明原文鏈接:https://www.cnblogs.com/QiaoPengjun/p/17434443.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/553516.html
標籤:其他
上一篇:【華為機試】單詞倒敘
下一篇:返回列表