主頁 > 後端開發 > Rust async 編程

Rust async 編程

2023-05-26 18:37:19 後端開發

Rust async 編程

Asynchronous Programming in Rust:https://rust-lang.github.io/async-book/

中文書名《Rust 異步編程指南》:https://github.com/rustlang-cn/async-book

Rust語言圣經(Rust Course):https://course.rs/advance/async/getting-started.html

一、Getting Started

1.1 為什么使用 async

為什么使用 async

  • Async 編程,是一種并發(concurrent)編程模型
    • 允許你在少數系統執行緒上運行大量的并發任務
    • 通過 async/await 語法,看起來和同步編程差不多

其它的并發模型

  • OS 執行緒
    • 無需改變任何編程模型,執行緒間同步困難,性能開銷大
    • 執行緒池可以降低一些成本,但難以支撐大量 IO 系結的作業
  • Event-driven 編程
    • 與回呼函式一起用,可能高效
    • 非線性的控制流,資料流和錯誤傳播難以追蹤
  • 協程(Coroutines)
    • 類似執行緒,無需改變編程模型
    • 類似 async ,支持大量任務
    • 抽象掉了底層細節(這對系統編程、自定義運行時的實作很重要)
  • Actor 模型
    • 將所有并發計算劃分為 actor , 訊息通信易出錯
    • 可以有效的實作 actor 模型,但許多實際問題沒解決(例如流程控制、重試邏輯)

Rust 中的 async

  • Future 是惰性的
    • 只有poll時才能取得進展, 被丟棄的 future 就無法取得進展了
  • Async是零成本的
    • 使用async ,可以無需堆記憶體分配(heap allocation)和動態調度(dynamic dispatch),對性能大好,且允許在受限環境使用 async
  • 不提供內置運行時
    • 運行時由Rust 社區提供,例如 tokio
  • 單執行緒、多執行緒均支持
    • 這兩者擁有各自的優缺點

Rust 中的 async 和執行緒(thread)

  • OS 執行緒:
    • 適用于少量任務,有記憶體和CPU開銷,且執行緒生成和執行緒間切換非常昂貴
    • 執行緒池可以降低一些成本
    • 允許重用同步代碼,代碼無需大改,無需特定編程模型
    • 有些系統支持修改執行緒優先級
  • Async:
    • 顯著降低記憶體和CPU開銷
    • 同等條件下,支持比執行緒多幾個數量級的任務(少數執行緒支撐大量任務)
    • 可執行檔案大(需要生成狀態機,每個可執行檔案捆綁一個異步運行時)

Async 并不是比執行緒好,只是不同而已!

總結:

  • 有大量 IO 任務需要并發運行時,選 async 模型
  • 有部分 IO 任務需要并發運行時,選多執行緒,如果想要降低執行緒創建和銷毀的開銷,可以使用執行緒池
  • 有大量 CPU 密集任務需要并行運行時,例如并行計算,選多執行緒模型,且讓執行緒數等于或者稍大于 CPU 核心數
  • 無所謂時,統一選多執行緒

例子

如果想并發的下載檔案,你可以使用多執行緒如下實作:

fn get_two_sites() {
    // Spawn two threads to do work. 創建兩個新執行緒執行任務
    let thread_one = thread::spawn(|| download("https://www.foo.com"));
    let thread_two = thread::spawn(|| download("https://www.bar.com"));

    // Wait for both threads to complete. 等待兩個執行緒的完成
    thread_one.join().expect("thread one panicked");
    thread_two.join().expect("thread two panicked");
}

使用async的方式:

async fn get_two_sites_async() {
    // Create two different "futures" which, when run to completion, 創建兩個不同的`future`,你可以把`future`理解為未來某個時刻會被執行的計劃任務
    // will asynchronously download the webpages. 當兩個`future`被同時執行后,它們將并發的去下載目標頁面
    let future_one = download_async("https://www.foo.com");
    let future_two = download_async("https://www.bar.com");

    // Run both futures to completion at the same time. 同時運行兩個`future`,直至完成
    join!(future_one, future_two);
}

自定義并發模型

  • 除了執行緒和async,還可以用其它的并發模型(例如 event-driven)

1.2 Rust Async 的目前狀態

Async Rust 目前的狀態

  • 部分穩定,部分仍在變化,
  • 特點:
    • 針對典型并發任務,性能出色
    • 與高級語言特性頻繁互動(生命周期、pinning)
    • 同步和異步代碼間、不同運行時的異步代碼間存在兼容性約束
    • 由于不斷進化,維護負擔更重

語言和庫的支持

  • 雖然Rust本身就支持Async編程,但很多應用依賴與社區的庫:
    • 標準庫提供了最基本的特性、型別和功能,例如 Future trait
    • async/await 語法直接被Rust編譯器支持
    • futures crate 提供了許多實用型別、宏和函式,它們可以用于任何異步應用程式,
    • 異步代碼、IO 和任務生成的執行由 "async runtimes" 提供,例如 Tokio 和 async-std,大多數async 應用程式和一些 async crate 都依賴于特定的運行時,

注意

  • Rust 不允許你在 trait 里宣告 async 函式

編譯和除錯

  • 編譯錯誤:
    • 由于 async 通常依賴于更復雜的語言功能,例如生命周期和Pinning,因此可能會更頻繁地遇到這些型別的錯誤,
  • 運行時錯誤:
    • 每當運行時遇到異步函式,編譯器會在后臺生成一個狀態機,Stack traces 里有其明細,以及運行時呼叫的函式,因此解釋起來更復雜,
  • 新的失效模式:
    • 可能出現一些新的故障,它們可以通過編譯,甚至單元測驗,

兼容性考慮

  • async和同步代碼不能總是自由組合

    • 例如,不能直接從同步函式來呼叫 async 異步函式
  • Async 代碼間也不總是能自由組合

    • 一些crate依賴于特定的 async 運行時
  • 因此,盡早研究確定使用哪個 async 運行時

性能特征

  • async 的性能依賴于運行時的表現(通常較出色)

1.3 async/await 入門

async

  • async 把一段代碼轉化為一個實作了Future trait 的狀態機
  • 雖然在同步方法中呼叫阻塞函式會阻塞整個執行緒,但阻塞的Future將放棄對執行緒的控制,從而允許其它Future來運行,
~/rust via ?? base
? cargo new async_demo
     Created binary (application) `async_demo` package

~/rust via ?? base
? cd async_demo

async_demo on  master [?] via ?? 1.67.1 via ?? base
? c

async_demo on  master [?] via ?? 1.67.1 via ?? base
?


代碼

#![allow(unused)]
fn main() {
    async fn do_something() {/* ... */}
}

Cargo .toml

[package]
name = "async_demo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
futures = "0.3.28"

async fn

  • 異步函式語法:
    • async fn do_something() {/* ... */}
    • async fn 回傳的是 Future,Future 需要由一個執行者來運行
  • futures::executor::block_on;
    • block_on 阻塞當前執行緒,直到提供的 Future 運行完成
    • 其它執行者提供更復雜的行為,例如將多個 Future 安排到同一個執行緒上
use futures::executor::block_on;

async fn hello_world() {
    println!("Hello world!");
}

fn main() {
    let future = hello_world(); // 什么都沒列印出來
    block_on(future); // `future` 運行,并列印出 "Hello world!"
}

Await

  • 在 async fn 中,可以使用 .await 來等待另一個實作 Future trait 的完成
  • 與 block_on 不同,.await不會阻塞當前執行緒,而是異步的等待 Future 的完成(如果該 Future 目前無法取得進展,就允許其他任務執行)
use futures::executor::block_on;

struct Song {}

async fn learn_song() -> Song {
    Song {}
}
async fn sing_song(song: Song) { /* ... */
}

async fn dance() { /* ... */
}

fn main() {
    let song = block_on(learn_song());
    block_on(sing_song(song));
    block_on(dance());
}

修改之后

use futures::executor::block_on;

struct Song {}

async fn learn_song() -> Song {
    Song {}
}
async fn sing_song(song: Song) {}

async fn dance() {}

async fn learn_and_sing() {
    let song = learn_song().await;
    sing_song(song).await;
}

async fn async_main() {
    let f1 = learn_and_sing();
    let f2 = dance();
    futures::join!(f1, f2);
}

fn main() {
    block_on(async_main());
}

二、幕后原理:執行 Future 和任務

2.1 Future trait

Future trait

  • Future trait是 Rust Async異步編程的核心
  • Future 是一種異步計算,它可以產生一個值
  • 實作了 Future 的型別表示目前可能還不可用的值

下面是一個簡化版的 Future trait:

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

enum Poll<T> {
    Ready(T),
    Pending,
}

  • Future 可以表示:

    • 下一次網路資料包的到來

    • 下一次滑鼠的移動

    • 或者僅僅是經過一段時間的時間點

  • Future 代表著一種你可以檢驗其是否完成的操作

  • Future 可以通過呼叫 poll 函式來取得進展

    • poll 函式會驅動 Future 盡可能接近完成
    • 如果 Future 完成了:就回傳 poll::Ready(result),其中 result 就是最終的結果
    • 如果 Future 還無法完成:就回傳 poll::Pending,并當 Future 準備好取得更多進展時呼叫一個 waker 的wake() 函式
  • 針對 Future,你唯一能做的就是使用 poll 來敲它,直到一個值掉出來

wake() 函式

  • 當 wake() 函式被呼叫時:
    • 執行器將驅動 Future 再次呼叫 poll 函式,以便 Future 能取得更多的進展
  • 沒有wake() 函式,執行器就不知道特定的 Future 何時能取得進展(就得不斷的 poll)
  • 通過 wake() 函式,執行器就確切的知道哪些 Future 已準備好進行 poll() 的呼叫

例子

pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it. socket有資料,寫入buffer中并回傳
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data. socket中還沒資料
            //
            // Arrange for `wake` to be called once data is available. 注冊一個`wake`函式,當資料可用時,該函式會被呼叫,
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data. 然后當前Future的執行器會再次呼叫`poll`方法,此時就可以讀取到資料
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}

例子

  • 組合多個異步操作,而無需中間分配
  • 可以通過無分配的狀態機來實作多個 Future 同時運行或串聯運行
/// A SimpleFuture that runs two other futures to completion concurrently. 它會并發地運行兩個Future直到它們完成
///
/// Concurrency is achieved via the fact that calls to `poll` each future
/// may be interleaved, allowing each future to advance itself at its own pace. 之所以可以并發,是因為兩個Future的輪詢可以交替進行,一個阻塞,另一個就可以立刻執行,反之亦然
pub struct Join<FutureA, FutureB> {
    // Each field may contain a future that should be run to completion. 結構體的每個欄位都包含一個Future,可以運行直到完成.
    // If the future has already completed, the field is set to `None`.
    // This prevents us from polling a future after it has completed, which
    // would violate the contract of the `Future` trait. 等到Future完成后,欄位會被設定為 `None`. 這樣Future完成后,就不會再被輪詢
    a: Option<FutureA>,
    b: Option<FutureB>,
}

impl<FutureA, FutureB> SimpleFuture for Join<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        // Attempt to complete future `a`. 嘗試去完成一個 Future `a`
        if let Some(a) = &mut self.a {
            if let Poll::Ready(()) = a.poll(wake) {
                self.a.take();
            }
        }

        // Attempt to complete future `b`. 嘗試去完成一個 Future `b`
        if let Some(b) = &mut self.b {
            if let Poll::Ready(()) = b.poll(wake) {
                self.b.take();
            }
        }

        if self.a.is_none() && self.b.is_none() {
            // Both futures have completed -- we can return successfully 兩個 Future都已完成 - 我們可以成功地回傳了
            Poll::Ready(())
        } else {
            // One or both futures returned `Poll::Pending` and still have 至少還有一個 Future 沒有完成任務,因此回傳 `Poll::Pending`.
            // work to do. They will call `wake()` when progress can be made. 當該 Future 再次準備好時,通過呼叫`wake()`函式來繼續執行
            Poll::Pending
        }
    }
}

例子

多個連續的 Future 可以一個接一個的運行

/// A SimpleFuture that runs two futures to completion, one after another. 一個SimpleFuture, 它使用順序的方式,一個接一個地運行兩個Future
//
// Note: for the purposes of this simple example, `AndThenFut` assumes both 注意: 由于本例子用于演示,因此功能簡單,`AndThenFut` 會假設兩個 Future 在創建時就可用了.
// the first and second futures are available at creation-time. The real
// `AndThen` combinator allows creating the second future based on the output
// of the first future, like `get_breakfast.and_then(|food| eat(food))`. 而真實的`Andthen`允許根據第一個`Future`的輸出來創建第二個`Future`,因此復雜的多,
pub struct AndThenFut<FutureA, FutureB> {
    first: Option<FutureA>,
    second: FutureB,
}

impl<FutureA, FutureB> SimpleFuture for AndThenFut<FutureA, FutureB>
where
    FutureA: SimpleFuture<Output = ()>,
    FutureB: SimpleFuture<Output = ()>,
{
    type Output = ();
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if let Some(first) = &mut self.first {
            match first.poll(wake) {
                // We've completed the first future -- remove it and start on
                // the second! 我們已經完成了第一個 Future, 可以將它移除, 然后準備開始運行第二個
                Poll::Ready(()) => self.first.take(),
                // We couldn't yet complete the first future. 第一個 Future 還不能完成
                Poll::Pending => return Poll::Pending,
            };
        }
        // Now that the first future is done, attempt to complete the second. 運行到這里,說明第一個Future已經完成,嘗試去完成第二個
        self.second.poll(wake)
    }
}

真正的 Future trait

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`: 首先值得注意的地方是,`self`的型別從`&mut self`變成了`Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`: 其次將`wake: fn()` 修改為 `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}
  • self 型別不再是 &mut self,而是 pin<&mut self>

    • 它允許我們創建不可移動的 future
  • 不可移動的物件可以在它們的欄位間存盤指標

  • 需要啟用async/awaitPin 就是必須的

  • wake: fn() 變成了 &mut Context<'_>

  • 在 SimpleFuture 里:

    • 我們通過呼叫函式指標 (fn()) 來告訴 Future 的執行器:相關的 Future 應該被 poll 了,
    • 由于 fn() 是一個函式指標,它不能存盤任何關于哪個 Future呼叫了 wake 的資料
  • Context 型別提供了訪問 Waker型別的值的方式,這些值可以被用來 wake up 特定的任務

    • 例如,實際專案中Web Server可能有上千個不同的連接,它們的wakeup 應單獨管理

總之,在正式場景要進行 wake ,就必須攜帶上資料, 而 Context 型別通過提供一個 Waker 型別的值,就可以用來喚醒特定的的任務,

2.2 使用 Waker 喚醒任務

Waker 型別的作用

  • Future在第一次poll的時候通常無法完成任務,所以Future需要保證在準備好取得更多進展后,可以再次被 poll
  • 每次 Future 被 poll,它都是作為一個任務的一部分
  • 任務(Task)就是被提交給執行者的頂層的 Future

Waker 型別

  • Waker 提供了 wake() 方法,它可以被用來告訴執行者:相關的任務應該被喚醒
  • 當 wake() 被呼叫,執行者知道 Waker 所關聯的任務已經準備好取得更多進展,Future應該被再次 poll
  • Waker 實作了 clone(),可以復制和存盤

例子

  • 使用 Waker 實作一個簡單的計時器 Future
    • 構建一個定時器
~/rust via ?? base
? cargo new timer_future
     Created binary (application) `timer_future` package

~/rust via ?? base
? cd timer_future

timer_future on  master [?] via ?? 1.67.1 via ?? base
? c

lib.rs 檔案

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
/// 在Future和等待的執行緒間共享狀態
struct SharedState {
    /// Whether or not the sleep time has elapsed
    ///  定時(睡眠)是否結束
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    /// 當睡眠結束后,執行緒可以用`waker`通知`TimerFuture`來喚醒任務
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        // Look at the shared state to see if the timer has already completed.
        // 通過檢查共享狀態,來確定定時器是否已經完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            // 設定`waker`,這樣新執行緒在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,
            //
            // 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理,
            // 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,
            // 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務
            shared_state.waker = Some(cx.waker().clone());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    /// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread 創建新執行緒
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            thread::sleep(duration); //  睡眠指定時間實作計時功能
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            // 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                waker.wake()
            }
        });

        TimerFuture { shared_state }
    }
}

2.3 構建一個執行器(Executor)

Future 執行者(Executor)

  • Future 是惰性的:除非驅動它們來完成,否則就什么都不做
  • 一種驅動方式是在 async 函式里使用 .await,但這只是把問題推到了上一層面
  • 誰來執行頂層 async 函式回傳的 Future?
    • 需要的是一個 Future 執行者
  • Future 執行者會獲取一系列頂層的 Future,通過在 Future 可以有進展的時候呼叫 poll,來將這些 Future 運行至完成
  • 通常執行者首先會對 Future進行 poll 一次,以便開始
  • 當 Future 通過呼叫 wake() 表示它們已經準備好取得進展時,它們就會被放回到一個佇列里,然后 poll 再次被呼叫,重復此操作直到 Future 完成

例子

  • 構建簡單的執行者,可以運行大量的頂層 Future 來并發的完成
  • 需要使用 future crate 的 ArcWake trait:
    • 它提供了簡單的方式用來組建 Waker

lib.rs 檔案

use std::{
    future::Future,
    pin::Pin,
    sync::{Arc, Mutex},
    task::{Context, Poll, Waker},
    thread,
    time::Duration,
};

pub struct TimerFuture {
    shared_state: Arc<Mutex<SharedState>>,
}

/// Shared state between the future and the waiting thread
/// 在Future和等待的執行緒間共享狀態
struct SharedState {
    /// Whether or not the sleep time has elapsed
    ///  定時(睡眠)是否結束
    completed: bool,

    /// The waker for the task that `TimerFuture` is running on.
    /// The thread can use this after setting `completed = true` to tell
    /// `TimerFuture`'s task to wake up, see that `completed = true`, and
    /// move forward.
    /// 當睡眠結束后,執行緒可以用`waker`通知`TimerFuture`來喚醒任務
    waker: Option<Waker>,
}

impl Future for TimerFuture {
    type Output = ();
    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
        println!("[{:?}] Polling TimerFuture...", thread::current().id());
        // Look at the shared state to see if the timer has already completed.
        // 通過檢查共享狀態,來確定定時器是否已經完成
        let mut shared_state = self.shared_state.lock().unwrap();
        if shared_state.completed {
            println!("[{:?}] TimerFuture completed...", thread::current().id());
            Poll::Ready(())
        } else {
            // Set waker so that the thread can wake up the current task
            // when the timer has completed, ensuring that the future is polled
            // again and sees that `completed = true`.
            //
            // It's tempting to do this once rather than repeatedly cloning
            // the waker each time. However, the `TimerFuture` can move between
            // tasks on the executor, which could cause a stale waker pointing
            // to the wrong task, preventing `TimerFuture` from waking up
            // correctly.
            //
            // N.B. it's possible to check for this using the `Waker::will_wake`
            // function, but we omit that here to keep things simple.
            // 設定`waker`,這樣新執行緒在睡眠(計時)結束后可以喚醒當前的任務,接著再次對`Future`進行`poll`操作,
            //
            // 下面的`clone`每次被`poll`時都會發生一次,實際上,應該是只`clone`一次更加合理,
            // 選擇每次都`clone`的原因是: `TimerFuture`可以在執行器的不同任務間移動,如果只克隆一次,
            // 那么獲取到的`waker`可能已經被篡改并指向了其它任務,最終導致執行器運行了錯誤的任務
            shared_state.waker = Some(cx.waker().clone());
            println!("[{:?}] TimerFuture pending...", thread::current().id());
            Poll::Pending
        }
    }
}

impl TimerFuture {
    /// Create a new `TimerFuture` which will complete after the provided
    /// timeout.
    /// 創建一個新的`TimerFuture`,在指定的時間結束后,該`Future`可以完成
    pub fn new(duration: Duration) -> Self {
        println!("[{:?}] 開始創建新的 TimerFuture...", thread::current().id());
        let shared_state = Arc::new(Mutex::new(SharedState {
            completed: false,
            waker: None,
        }));

        // Spawn the new thread 創建新執行緒
        let thread_shared_state = shared_state.clone();
        thread::spawn(move || {
            println!(
                "[{:?}] TimerFuture 生成新的執行緒并開始睡眠...",
                thread::current().id()
            );
            thread::sleep(duration); //  睡眠指定時間實作計時功能
            let mut shared_state = thread_shared_state.lock().unwrap();
            // Signal that the timer has completed and wake up the last
            // task on which the future was polled, if one exists.
            // 通知執行器定時器已經完成,可以繼續`poll`對應的`Future`了
            shared_state.completed = true;
            if let Some(waker) = shared_state.waker.take() {
                println!(
                    "[{:?}] TimerFuture 新執行緒獲得 waker,并進行 wake()...",
                    thread::current().id()
                );
                waker.wake()
            } else {
                println!(
                    "[{:?}] TimerFuture 新執行緒沒獲得 waker...",
                    thread::current().id()
                );
            }
        });

        println!("[{:?}] 回傳新的 TimerFuture...", thread::current().id());
        TimerFuture { shared_state }
    }
}

main.rs 檔案

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::{
    thread,
    future::Future,
    sync::mpsc::{sync_channel, Receiver, SyncSender},
    sync::{Arc, Mutex},
    task::Context,
    time::Duration,
};
// The timer we wrote in the previous section: 引入之前實作的定時器模塊
use timer_future::TimerFuture;

/// Task executor that receives tasks off of a channel and runs them.
/// 任務執行器,負責從通道中接收任務然后執行
struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

/// `Spawner` spawns new futures onto the task channel.
/// `Spawner`負責創建新的`Future`然后將它發送到任務通道中
#[derive(Clone)]
struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

/// A future that can reschedule itself to be polled by an `Executor`.
/// 一個Future,它可以調度自己(將自己放入任務通道中),然后等待執行器去`poll`
struct Task {
    /// In-progress future that should be pushed to completion. 進行中的Future,在未來的某個時間點會被完成
    ///
    /// The `Mutex` is not necessary for correctness, since we only have 按理來說`Mutex`在這里是多余的,因為我們只有一個執行緒來執行任務,但是由于
    /// one thread executing tasks at once. However, Rust isn't smart
    /// enough to know that `future` is only mutated from one thread, Rust并不聰明,它無法知道`Future`只會在一個執行緒內被修改,并不會被跨執行緒修改,因此
    /// so we need to use the `Mutex` to prove thread-safety. A production 我們需要使用`Mutex`來滿足這個笨笨的編譯器對執行緒安全的執著,
    /// executor would not need this, and could use `UnsafeCell` instead.
    ///  如果是生產級的執行器實作,不會使用`Mutex`,因為會帶來性能上的開銷,取而代之的是使用`UnsafeCell`
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// Handle to place the task itself back onto the task queue.
    /// 可以將該任務自身放回到任務通道中,等待執行器的poll
    task_sender: SyncSender<Arc<Task>>,
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // Maximum number of tasks to allow queueing in the channel at once. 任務通道允許的最大緩沖數(任務佇列的最大長度)
    // This is just to make `sync_channel` happy, and wouldn't be present in
    // a real executor. 當前的實作僅僅是為了簡單,在實際的執行中,并不會這么使用
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    println!("[{:?}] 生成 Executor 和 Spawner (含發送端、接收端)...", thread::current().id());
    (Executor { ready_queue }, Spawner { task_sender })
}

impl Spawner {
    // 把 Future 包裝成任務發送到通道
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        println!("[{:?}] 將 Future 組成 Task,放入 Channel...", thread::current().id());
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // Implement `wake` by sending this task back onto the task channel
        // so that it will be polled again by the executor.
        // 通過發送任務到任務管道的方式來實作`wake`,這樣`wake`后,任務就能被執行器`poll`
        println!("[{:?}] wake_by_ref...", thread::current().id());
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

impl Executor {
    fn run(&self) {
        println!("[{:?}] Executor running...", thread::current().id());
        while let Ok(task) = self.ready_queue.recv() { // 從通道不斷接收任務
            println!("[{:?}] 接收到任務...", thread::current().id());
            // Take the future, and if it has not yet completed (is still Some),
            // poll it in an attempt to complete it. 獲取一個future,若它還沒有完成(仍然是Some,不是None),則對它進行一次poll并嘗試完成它
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                println!("[{:?}] 從任務中取得 Future...", thread::current().id());
                // Create a `LocalWaker` from the task itself  基于任務自身創建一個 `LocalWaker`
                let waker = waker_ref(&task);
                println!("[{:?}] 獲得 waker by ref...", thread::current().id());
                let context = &mut Context::from_waker(&waker);
                // `BoxFuture<T>` is a type alias for  #`BoxFuture<T>`是`Pin<Box<dyn Future<Output = T> + Send + 'static>>`的型別別名
                // `Pin<Box<dyn Future<Output = T> + Send + 'static>>`.
                // We can get a `Pin<&mut dyn Future + Send + 'static>`
                // from it by calling the `Pin::as_mut` method.
                // 通過呼叫`as_mut`方法,可以將上面的型別轉換成`Pin<&mut dyn Future + Send + 'static>`
                println!("[{:?}] 獲得 context,準備進行 poll()...", thread::current().id());
                if future.as_mut().poll(context).is_pending() {
                    // We're not done processing the future, so put it
                    // back in its task to be run again in the future. Future還沒執行完,因此將它放回任務中,等待下次被poll
                    *future_slot = Some(future);
                    println!("[{:?}] Poll::Pending ====", thread::current().id());
                } else {
                    println!("[{:?}] Poll::Ready....", thread::current().id());
                }
            }
        }
        println!("[{:?}] Excutor run 結束", thread::current().id());
    }
}

fn main() {
    // 回傳一個執行者和一個任務的生成器
    let (executor, spawner) = new_executor_and_spawner();

    // Spawn a task to print before and after waiting on a timer. 生成一個任務 async塊是一個Future
    spawner.spawn(async {
        println!("[{:?}] howdy!", thread::current().id());
        // Wait for our timer future to complete after two seconds. 創建定時器Future,并等待它完成
        TimerFuture::new(Duration::new(2, 0)).await;
        println!("[{:?}] done!", thread::current().id());
    });

    // Drop the spawner so that our executor knows it is finished and won't
    // receive more incoming tasks to run. drop掉任務,這樣執行器就知道任務已經完成,不會再有新的任務進來
    println!("[{:?}] drop Spawner!", thread::current().id());
    drop(spawner);

    // Run the executor until the task queue is empty. 運行執行器直到任務佇列為空
    // This will print "howdy!", pause, and then print "done!". 任務運行后,會先列印`howdy!`, 暫停2秒,接著列印 `done!`
    executor.run();
}

運行

timer_future on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 2.9s 
? cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.06s
     Running `target/debug/timer_future`
[ThreadId(1)] 生成 Executor 和 Spawner (含發送端、接收端)...
[ThreadId(1)] 將 Future 組成 Task,放入 Channel...
[ThreadId(1)] drop Spawner!
[ThreadId(1)] Executor running...
[ThreadId(1)] 接收到任務...
[ThreadId(1)] 從任務中取得 Future...
[ThreadId(1)] 獲得 waker by ref...
[ThreadId(1)] 獲得 context,準備進行 poll()...
[ThreadId(1)] howdy!
[ThreadId(1)] 開始創建新的 TimerFuture...
[ThreadId(1)] 回傳新的 TimerFuture...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture pending...
[ThreadId(1)] Poll::Pending ====
[ThreadId(2)] TimerFuture 生成新的執行緒并開始睡眠...
[ThreadId(2)] TimerFuture 新執行緒獲得 waker,并進行 wake()...
[ThreadId(2)] wake_by_ref...
[ThreadId(1)] 接收到任務...
[ThreadId(1)] 從任務中取得 Future...
[ThreadId(1)] 獲得 waker by ref...
[ThreadId(1)] 獲得 context,準備進行 poll()...
[ThreadId(1)] Polling TimerFuture...
[ThreadId(1)] TimerFuture completed...
[ThreadId(1)] done!
[ThreadId(1)] Poll::Ready....
[ThreadId(1)] Excutor run 結束

timer_future on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 2.6s 
? 

2.4 執行器和系統IO

FutureSocket 上執行異步讀取

  • 這個 Future 會讀取 socket 上可用的資料
  • 如果沒有資料,它就屈服于執行者:請求當 socket 再次可讀時,喚醒它的任務
  • 本例中我們不知道 Socket 型別是如何實作的,尤其不知道 set_readable_callback 函式如何作業,那么如何在 socket 再次可讀時安排呼叫 wake() 呢?
    • 一種辦法是使用一個執行緒不斷檢查 socket 是否可讀(低效!)
pub struct SocketRead<'a> {
    socket: &'a Socket,
}

impl SimpleFuture for SocketRead<'_> {
    type Output = Vec<u8>;

    fn poll(&mut self, wake: fn()) -> Poll<Self::Output> {
        if self.socket.has_data_to_read() {
            // The socket has data -- read it into a buffer and return it. 
           // socket有資料,寫入buffer中并回傳
            Poll::Ready(self.socket.read_buf())
        } else {
            // The socket does not yet have data. socket中還沒資料
            //
            // Arrange for `wake` to be called once data is available.
            // When data becomes available, `wake` will be called, and the
            // user of this `Future` will know to call `poll` again and
            // receive data.
          	// 注冊一個`wake`函式,當資料可用時,該函式會被呼叫,
						// 然后當前Future的執行器會再次呼叫`poll`方法,此時就可以讀取到資料
            self.socket.set_readable_callback(wake);
            Poll::Pending
        }
    }
}
  • 實際中,這個問題通過與IO感知的系統阻塞原語(primitive)的集成來解決
    • 例如 Linux 中的 epollFreeBSDmacOS 中的 kqueueWindows 中的 IOCP, Fuchisa中的 ports
    • 所有這些都是通過 Rust 跨平臺包的 mio crate 來暴露的
  • 這些原語(primitive)都允許執行緒阻塞多個異步 IO 事件,并在其中一個事件完成后回傳,
struct IoBlocker {
    /* ... */
}

struct Event {
    // An ID uniquely identifying the event that occurred and was listened for.
    // Event的唯一ID,該事件發生后,就會被監聽起來
    id: usize,

    // A set of signals to wait for, or which occurred.
    // 一組需要等待或者已發生的信號
    signals: Signals,
}

impl IoBlocker {
    /// Create a new collection of asynchronous IO events to block on.
    /// 創建需要阻塞等待的異步IO事件的集合
    fn new() -> Self { /* ... */ }

    /// Express an interest in a particular IO event.
    /// 對指定的IO事件表示興趣
    fn add_io_event_interest(
        &self,

        /// The object on which the event will occur
        /// 事件所系結的socket
        io_object: &IoObject,

        /// A set of signals that may appear on the `io_object` for
        /// which an event should be triggered, paired with
        /// an ID to give to events that result from this interest.
        event: Event,
    ) { /* ... */ }

    /// Block until one of the events occurs.
    /// 進入阻塞,直到某個事件出現
    fn block(&self) -> Event { /* ... */ }
}

let mut io_blocker = IoBlocker::new();
io_blocker.add_io_event_interest(
    &socket_1,
    Event { id: 1, signals: READABLE },
);
io_blocker.add_io_event_interest(
    &socket_2,
    Event { id: 2, signals: READABLE | WRITABLE },
);
let event = io_blocker.block();

// prints e.g. "Socket 1 is now READABLE" if socket one became readable.
// 當socket的資料可以讀取時,列印 "Socket 1 is now READABLE" 
println!("Socket {:?} is now {:?}", event.id, event.signals);

Socket::set_readable_callback 的偽代碼

  • Future 執行者可以使用這些原語提供異步 IO 物件,例如 socket,它就可以當特定 IO 事件發生時通過配置回呼來運行
  • 針對我們 SocketRead 例子,Socket::set_readable_callback 的偽代碼大致如下:
impl Socket {
    fn set_readable_callback(&self, waker: Waker) {
        // `local_executor` is a reference to the local executor.
        // this could be provided at creation of the socket, but in practice
        // many executor implementations pass it down through thread local
        // storage for convenience.
        let local_executor = self.local_executor;

        // Unique ID for this IO object.
        let id = self.id;

        // Store the local waker in the executor's map so that it can be called
        // once the IO event arrives.
        local_executor.event_map.insert(id, waker);
        local_executor.add_io_event_interest(
            &self.socket_file_descriptor,
            Event { id, signals: READABLE },
        );
    }
}

現在,我們就只有一個執行者執行緒,它可以接收 IO事件,并將它們分配到適合的 Waker,這將喚醒相應的任務,并允許執行者在回傳檢查更多的IO事件之前,驅動更多的任務完成(回圈繼續...),

這樣,我們只需要一個執行器執行緒,它會接收IO事件并將其分發到對應的 Waker 中,接著后者會喚醒相關的任務,最終通過執行器 poll 后,任務可以順利的繼續執行, 這種IO讀取流程可以不停的回圈,直到 socket 關閉,

三、async & .await

什么是 async/.await

  • async/.await 是 Rust 的特殊語法,在發生阻塞時,它讓放棄當前執行緒的控制權成為可能,這就允許在等待操作完成的時候,允許其它代碼取得進展

使用 async 的兩種方式

有兩種方式可以使用asyncasync fn用于宣告函式,async { ... }用于宣告陳述句塊,它們會回傳一個實作 Future 特征的值:

  • async 和 async blocks:
    • 都回傳實作了 Future trait 的值
  • async 體和其它 future都是惰性的:
    • 在真正運行之前什么都不做
  • 使用 .await是最常見的運行future 的方式:
    • 對 future 使用 .await就會嘗試驅動Future運行至完成
  • 如果 Future 被阻塞:
    • 它會放棄當前執行緒的控制權
    • 當可取得更多進展時,執行器會撿起這個 Future 并恢復執行,最終由 .await 完成決議
// `foo()` returns a type that implements `Future<Output = u8>`.
// `foo().await` will result in a value of type `u8`.
// `foo()`回傳一個`Future<Output = u8>`,
// 當呼叫`foo().await`時,該`Future`將被運行,當呼叫結束后我們將獲取到一個`u8`值
async fn foo() -> u8 { 5 }

fn bar() -> impl Future<Output = u8> {
    // This `async` block results in a type that implements
    // `Future<Output = u8>`.
    // 下面的`async`陳述句塊回傳`Future<Output = u8>`
    async {
        let x: u8 = foo().await;
        x + 5
    }
}

例子

use async_std::io::prelude::*;
use async_std::net;
use async_std::task;

// 異步函式以 async fn 開始  里面由3個異步函式 .await
// 無需調整 async fn 的回傳型別,Rust 自動把它當成相應的 Future 型別
// 回傳的 Future 包含所需相關資訊:引數、本地變數空間...
// Future 的具體型別由編譯器基于函式體和引數自動生成
// 該型別沒有名稱
// 它實作了 Future<Output=R>
// 第一次對 cheapo_request 進行 poll 時:
// 從函式體頂部開始執行
// 直到第一個 await(針對 TcpStream::connect 回傳的 Future)
// 隨著 cheapo_request 的 Future 不斷被 poll,其執行就是從一個 await 到下一個 await,而且只有子 Future變成 Ready 之后才繼續
// cheapo_reauest 的 Future 會追蹤:
// 下一次 poll 應恢復繼續的那個店
// 以及所需的本地狀態(變數、引數、臨時變數等)
// 這種途中能暫停執行,然后恢復執行的能力是 async 所獨有的
// 由于 await 運算式依賴于“可恢復執行”這個特性,所以 await 只能用在 async 里
// 暫停執行時執行緒在做什么?
// 它不是在干等,而是在做其它作業,
async fn cheapo_request(host: &str, port: u16, path: &str) -> std::io::Result<String> {
  // .await 會等待,直到 future 變成 ready, await 最侄訓決議出 future 的值
  // connect 當呼叫 async 函式時,在其函式體執行前,它就會立即回傳
  // 這個 await 運算式會對 connect 的Future進行 poll:
  // 如果沒完成 -> 回傳 Pending
  // 針對 cheapo_request 的 poll 也無法繼續,
  // 直到 connect 的 Future 回傳 Ready
  let mut socket = net::TcpStream::connect((host, port)).await?;
  let request = format!("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n", path, host);
  
  socket.write_all(request.as_bytes()).await?;
  socket.shutdown(net::Shutdown::Write)?;
  let mut response = String::new();
  socket.read_to_string(&mut response).await?;
  Ok(response)
}

fn main() -> std::io::Result<()> {
  // 注意:
  // 下一次對 cheapo_request 的 Future 進行 poll 時:
  // 并不在函式體頂部開始執行
  // 它會在 connect Future 進行 poll 的地方繼續執行
  // 直到它變成 Ready,才會繼續在函式體往下走
  let response = task::block_on(cheapo_request("example.com", 80, "/"))?;
  println!("{}", response);
  Ok(())
}
  • await:
    • 獲得 Future 的所有權,并對其進行 poll
    • 如果 Future Ready,其最終值就是 await 運算式的值,這時執行就可以繼續了
    • 否則就回傳 Pending 給呼叫者

async 的生命周期

  • 與傳統函式不同:async fn,如果它的引數是參考或是其它非 'static 的,那么它回傳的 Future 就會系結到引數的生命周期上,
  • 這意味著 async fn 回傳的 future,在 .await 的同時,fn 的非 'static 的引數必須保持有效
// This function:
async fn foo(x: &u8) -> u8 { *x }

// Is equivalent to this function:
// 上面的函式跟下面的函式是等價的:
fn foo_expanded<'a>(x: &'a u8) -> impl Future<Output = u8> + 'a {
    async move { *x }
}

存盤 future 或傳遞 future

  • 通常,async 的函式在呼叫后會立即 .await,這就不是問題:
    • 例如:foo(&x).await
  • 如果存盤 future 或將其傳遞給其它任務或執行緒,就有問題了...
  • 一種變通解決辦法:
    • 思路:把使用參考作為引數的 async fn 轉為一個 'static future
    • 做法:在 async 塊里,將引數和 async fn 的 呼叫捆綁到一起(延長引數的生命周期來匹配 future)
fn bad() -> impl Future<Output = u8> {
    let x = 5;
    borrow_x(&x) // ERROR: `x` does not live long enough
}

// 將引數和對 async fn 的呼叫放在同一個 async 陳述句塊
fn good() -> impl Future<Output = u8> {
    async {
        let x = 5;
        borrow_x(&x).await
    }
}

以上代碼會報錯,因為 x 的生命周期只到 bad 函式的結尾, 但是 Future 顯然會活得更久

通過將引數移動到 async 陳述句塊內, 將它的生命周期擴展到 'static, 并跟回傳的 Future 保持了一致,

async move

  • async 塊和閉包都支持 move
  • async move 塊會獲得其參考變數的所有權:
    • 允許其比當前所在的作用域活得長
    • 但同時也放棄了與其它代碼共享這些變數的能力
/// `async` block:
///
/// Multiple different `async` blocks can access the same local variable
/// so long as they're executed within the variable's scope
// 多個不同的 `async` 陳述句塊可以訪問同一個本地變數,只要它們在該變數的作用域內執行
async fn blocks() {
    let my_string = "foo".to_string();

    let future_one = async {
        // ...
        println!("{my_string}");
    };

    let future_two = async {
        // ...
        println!("{my_string}");
    };

    // Run both futures to completion, printing "foo" twice:
    // 運行兩個 Future 直到完成
    let ((), ()) = futures::join!(future_one, future_two);
}

/// `async move` block:
///
/// Only one `async move` block can access the same captured variable, since
/// captures are moved into the `Future` generated by the `async move` block.
/// However, this allows the `Future` to outlive the original scope of the
/// variable:
// 由于`async move`會捕獲環境中的變數,因此只有一個`async move`陳述句塊可以訪問該變數,
// 但是它也有非常明顯的好處: 變數可以轉移到回傳的 Future 中,不再受借用生命周期的限制
fn move_block() -> impl Future<Output = ()> {
    let my_string = "foo".to_string();
    async move {
        // ...
        println!("{my_string}");
    }
}

在多執行緒執行者上進行 .await

  • 當使用多執行緒 future 執行者時,future 就可以在執行緒間移動:
    • 所以 async 體里面用的變數必須能夠在執行緒間移動
    • 因為任何的 .await 都可能導致切換到一個新執行緒
  • 這意味著使用以下型別時不安全的:
    • Rc、&RefCell 和任何其它沒有實作 Send trait 的型別,包括沒實作 Sync trait 的參考
    • 注意:呼叫 .await 時,只要這些型別不在作用域內,就可以使用它們,
  • 在跨域一個 .await 期間,持有傳統的、對 future 無感知的鎖,也不是好主意:
    • 可導致執行緒池鎖定
    • 為此,可使用 futures::lock 里的 Mutex 而不是 std::sync 里的

四、Pinning

什么是 Pin

  • Pin 與 Unpin 標記一起作業
  • Pin 會保證實作了 !Unpin 的物件永遠不會被移動

為什么需要 Pin?

let fut_one = /* ... */; // Future 1
let fut_two = /* ... */; // Future 2
async move {
    fut_one.await;
    fut_two.await;
}
  • 這會創建一個實作了 Future trait 的匿名型別
  • 提供一個和下面代碼類似的 poll 方法
// The `Future` type generated by our `async { ... }` block
// `async { ... }`陳述句塊創建的 `Future` 型別
struct AsyncFuture {
    fut_one: FutOne,
    fut_two: FutTwo,
    state: State,
}

// List of states our `async` block can be in
// `async` 陳述句塊可能處于的狀態
enum State {
    AwaitingFutOne,
    AwaitingFutTwo,
    Done,
}

impl Future for AsyncFuture {
    type Output = ();

    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
        loop {
            match self.state {
                State::AwaitingFutOne => match self.fut_one.poll(..) {
                    Poll::Ready(()) => self.state = State::AwaitingFutTwo,
                    Poll::Pending => return Poll::Pending,
                }
                State::AwaitingFutTwo => match self.fut_two.poll(..) {
                    Poll::Ready(()) => self.state = State::Done,
                    Poll::Pending => return Poll::Pending,
                }
                State::Done => return Poll::Ready(()),
            }
        }
    }
}

poll 第一次被呼叫時,它會去查詢 fut_one 的狀態,若 fut_one 無法完成,則 poll 方法會回傳,未來對 poll 的呼叫將從上一次呼叫結束的地方開始,該程序會一直持續,直到 Future 完成為止,

如果上例中 async 塊使用參考,會如何?

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}

這段代碼會編譯成下面的形式:

struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // 指向下面的`x`欄位
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

這里,ReadIntoBuf 擁有一個參考欄位,指向了結構體的另一個欄位 x ,一旦 AsyncFuture 被移動,那 x 的地址也將隨之變化,此時對 x 的參考就變成了不合法的,

  • 把 Future Pin(釘)到記憶體中的特定位置會防止該問題的發生:
    • 可以在 async 塊里安全的創建到值的參考

Pin 介紹

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
}

impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
        }
    }

    fn init(&mut self) {
        let self_ref: *const String = &self.a;
        self.b = self_ref;
    }

    fn a(&self) -> &str {
        &self.a
    }

    fn b(&self) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b());
    println!("a: {}, b: {}", test2.a(), test2.b());

}

運行輸出

a: test1, b: test1
a: test2, b: test2

修改之后

fn main() {
    let mut test1 = Test::new("test1");
    test1.init();
    let mut test2 = Test::new("test2");
    test2.init();

    println!("a: {}, b: {}", test1.a(), test1.b());
    std::mem::swap(&mut test1, &mut test2); // 交換 test1 和 test2 移動資料
    println!("a: {}, b: {}", test2.a(), test2.b());

}

輸出

a: test1, b: test1
a: test1, b: test2

Pin 的實踐

  • Pin 型別會包裹指標型別,保證指標指向的值不被移動,
  • 例如:Pin<&mut T>,Pin<&T>, Pin<Box>
    • 即使 T:!Unpin,也不能保證 T 不被移動

Unpin trait

  • 大多數型別如果被移動,不會造成問題,它們實作了 Unpin
  • 指向 Unpin 型別的指標,可自由的放入或從 Pin 中取出
    • 例如:u8 是 Unpin 的,Pin<&mut u8> 和普通的 &mut u8 一樣
  • 如果型別擁有 !Unpin 標記,那么在 Pin 之后它們就無法移動了
use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}


impl Test {
    fn new(txt: &str) -> Self {
        Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned, // 這個標記可以讓我們的型別自動實作特征`!Unpin`
        }
    }

    fn init(self: Pin<&mut Self>) {
        let self_ptr: *const String = &self.a;
        let this = unsafe { self.get_unchecked_mut() };
        this.b = self_ptr;
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        assert!(!self.b.is_null(), "Test::b called without Test::init being called first");
        unsafe { &*(self.b) }
    }
}

上面代碼中,我們使用了一個標記型別 PhantomPinned 將自定義結構體 Test 變成了 !Unpin (編譯器會自動幫我們實作),因此該結構體無法再被移動,

一旦型別實作了 !Unpin ,那將它的值固定到堆疊( stack )上就是不安全的行為,因此在代碼中我們使用了 unsafe 陳述句塊來進行處理,你也可以使用 pin_utils 來避免 unsafe 的使用,

pub fn main() {
    // test1 is safe to move before we initialize it
    // 此時的`test1`可以被安全的移動
    let mut test1 = Test::new("test1");
    // Notice how we shadow `test1` to prevent it from being accessed again
    // 新的`test1`由于使用了`Pin`,因此無法再被移動,這里的宣告會將之前的`test1`遮蔽掉(shadow)
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
    println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}

嘗試進行交換

pub fn main() {
    let mut test1 = Test::new("test1");
    let mut test1 = unsafe { Pin::new_unchecked(&mut test1) };
    Test::init(test1.as_mut());

    let mut test2 = Test::new("test2");
    let mut test2 = unsafe { Pin::new_unchecked(&mut test2) };
    Test::init(test2.as_mut());

    println!("a: {}, b: {}", Test::a(test1.as_ref()), Test::b(test1.as_ref()));
    std::mem::swap(test1.get_mut(), test2.get_mut()); // 報錯 
    println!("a: {}, b: {}", Test::a(test2.as_ref()), Test::b(test2.as_ref()));
}

Pinning to the Heap 固定到堆上

use std::pin::Pin;
use std::marker::PhantomPinned;

#[derive(Debug)]
struct Test {
    a: String,
    b: *const String,
    _marker: PhantomPinned,
}

impl Test {
    fn new(txt: &str) -> Pin<Box<Self>> {
        let t = Test {
            a: String::from(txt),
            b: std::ptr::null(),
            _marker: PhantomPinned,
        };
        let mut boxed = Box::pin(t);
        let self_ptr: *const String = &boxed.a;
        unsafe { boxed.as_mut().get_unchecked_mut().b = self_ptr };

        boxed
    }

    fn a(self: Pin<&Self>) -> &str {
        &self.get_ref().a
    }

    fn b(self: Pin<&Self>) -> &String {
        unsafe { &*(self.b) }
    }
}

pub fn main() {
    let test1 = Test::new("test1");
    let test2 = Test::new("test2");

    println!("a: {}, b: {}",test1.as_ref().a(), test1.as_ref().b());
    println!("a: {}, b: {}",test2.as_ref().a(), test2.as_ref().b());
}

將一個 !Unpin 型別的值固定到堆上,會給予該值一個穩定的記憶體地址,它指向的堆中的值在 Pin 后是無法被移動的,而且與固定在堆疊上不同,我們知道堆上的值在整個生命周期內都會被穩穩地固定住,

總結

  • 如果 T:Unpin (默認情況),那么 Pin<'a, T> 與 &'a mut T 完全等價

    • Unpin 意味著該型別如果被 Pin 了,那么它也是可以移動的,所以 Pin 對這種型別不起作用
  • 如果 T:!Unpin,那么把 &mut T 變成 Pin 的 T,需要 unsafe 操作

  • 大多數標準庫型別都實作了 Unpin,Rust 里大部分正常型別也是,由 async/await 生成的 Future 是個例外

  • 可以使用特性標記為型別添加一個 !Unpin 系結(最新版),或者通過添加 std::marker::PhantomPinned到型別上(穩定版)

  • 可以將資料 Pin 到 Stack 或 Heap 上

  • 把 !Unpin 物件 Pin 到 Stack 上需要 unsafe 操作

  • 把 !Unpin duix Pin 到 Heap 上不需要 unsafe 操作

    • 快捷操作:使用 Box::pin
  • 針對已經 Pin 的資料,如果它是 T: !Unpin 的,則需要保證它從被 Pin 后,記憶體一直有效且不會調整其用途,直到 drop 被呼叫,

    • 這是 Pin 合約的重要部分
  • T: Unpin ( Rust 型別的默認實作),那么 Pin<'a, T>&'a mut T 完全相同,也就是 Pin 將沒有任何效果, 該移動還是照常移動

  • 絕大多數標準庫型別都實作了 Unpin ,事實上,對于 Rust 中你能遇到的絕大多數型別,該結論依然成立 ,其中一個例外就是:async/await 生成的 Future 沒有實作 Unpin

  • 你可以通過以下方法為自己的型別添加!Unpin約束:

    • 使用文中提到的 std::marker::PhantomPinned
    • 使用nightly 版本下的 feature flag
  • 可以將值固定到堆疊上,也可以固定到堆上

    • !Unpin 值固定到堆疊上需要使用 unsafe
    • !Unpin 值固定到堆上無需 unsafe ,可以通過 Box::pin 來簡單的實作
  • 當固定型別T: !Unpin時,你需要保證資料從被固定到被drop這段時期內,其記憶體不會變得非法或者被重用

五、Streams

Stream trait

  • Stream trait 和 Future trait 類似,但它可以在完成前產生多個值,這點和標準庫 Iterator trait 也很像
trait Stream {
    /// The type of the value yielded by the stream.
    // Stream生成的值的型別
    type Item;

    /// Attempt to resolve the next item in the stream.
    /// Returns `Poll::Pending` if not ready, `Poll::Ready(Some(x))` if a value
    /// is ready, and `Poll::Ready(None)` if the stream has completed.
    // 嘗試去決議Stream中的下一個值,
    // 若無資料,回傳`Poll::Pending`, 若有資料,回傳 `Poll::Ready(Some(x))`, `Stream`完成則回傳 `Poll::Ready(None)`
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>)
        -> Poll<Option<Self::Item>>;
}

關于 Stream 的一個常見例子是訊息通道( futures 包中的)的消費者 Receiver,每次有訊息從 Send 端發送后,它都可以接收到一個 Some(val) 值, 一旦 Send 端關閉(drop),且訊息通道中沒有訊息后,它會接收到一個 None 值,

async fn send_recv() {
    const BUFFER_SIZE: usize = 10;
    let (mut tx, mut rx) = mpsc::channel::<i32>(BUFFER_SIZE);

    tx.send(1).await.unwrap();
    tx.send(2).await.unwrap();
    drop(tx);

    // `StreamExt::next` is similar to `Iterator::next`, but returns a
    // type that implements `Future<Output = Option<T>>`.
    // `StreamExt::next` 類似于 `Iterator::next`, 但是前者回傳的不是值,而是一個 `Future<Output = Option<T>>`,
    // 因此還需要使用`.await`來獲取具體的值
    assert_eq!(Some(1), rx.next().await);
    assert_eq!(Some(2), rx.next().await);
    assert_eq!(None, rx.next().await);
}

Iteration and Concurrency 迭代與并發

  • 與同步的 Iterator 類似,有很多方法迭代和處理 Stream 中的值:
    • 組合器風格的:map、filter、fold
    • 相應的 "early-exit-on-error" 版本:try_map、try_filter、try_fold
  • for 回圈無法和 Stream 一起使用
  • 命令式的 while let 和 next/try_next 函式可以與 Stream 一起使用

例子

async fn sum_with_next(mut stream: Pin<&mut dyn Stream<Item = i32>>) -> i32 {
    use futures::stream::StreamExt; // for `next`
    let mut sum = 0;
    while let Some(item) = stream.next().await {
        sum += item;
    }
    sum
}

async fn sum_with_try_next(
    mut stream: Pin<&mut dyn Stream<Item = Result<i32, io::Error>>>,
) -> Result<i32, io::Error> {
    use futures::stream::TryStreamExt; // for `try_next`
    let mut sum = 0;
    while let Some(item) = stream.try_next().await? {
        sum += item;
    }
    Ok(sum)
}

上面代碼是一次處理一個值的模式,但是需要注意的是:如果你選擇一次處理一個值的模式,可能會造成無法并發,這就失去了異步編程的意義, 因此,如果可以的話我們還是要選擇從一個 Stream 并發處理多個值的方式,通過 for_each_concurrenttry_for_each_concurrent 方法來實作:

async fn jump_around(
    mut stream: Pin<&mut dyn Stream<Item = Result<u8, io::Error>>>,
) -> Result<(), io::Error> {
    use futures::stream::TryStreamExt; // for `try_for_each_concurrent`
    const MAX_CONCURRENT_JUMPERS: usize = 100;

    stream.try_for_each_concurrent(MAX_CONCURRENT_JUMPERS, |num| async move {
        jump_n_times(num).await?;
        report_n_jumps(num).await?;
        Ok(())
    }).await?;

    Ok(())
}

六、Executing Multiple Futures at a Time(同時執行多個 Future)

本節內容

  • 真正的異步應用程式通常需要同時執行幾個不同的操作
  • 介紹一些可同時執行多個異步操作的方式:
    • Join!,等待所有 Future 完成
    • Select!,等待多個 future 中的一個完成
    • Spawning,創建一個頂級任務,他會運行一個 future 直至完成
    • FutureUnordered,一組 Future,它們會產生每個子 Future 的結果

6.1 join!

join!

  • futures::join 宏,它使得在等待多個 future完成的時候,可以同時并發的執行它們,

例子

async fn get_book_and_music() -> (Book, Music) {
    let book = get_book().await;
    let music = get_music().await;
    (book, music)
}

要支持同時看書和聽歌,有些人可能會生成下面代碼:

// WRONG -- don't do this
async fn get_book_and_music() -> (Book, Music) {
    let book_future = get_book();
    let music_future = get_music();
    (book_future.await, music_future.await)
}

為了正確的并發運行兩個 Future , 我們來試試 futures::join! 宏:

use futures::join;

async fn get_book_and_music() -> (Book, Music) {
    let book_fut = get_book();
    let music_fut = get_music();
    join!(book_fut, music_fut)
}

try_join!

  • 對于回傳 Result 的 future,更考慮使用 try_join!
    • 如果子 future 中某一個回傳了錯誤,try_join! 會立即完成

例子

use futures::try_join;

async fn get_book() -> Result<Book, String> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book();
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

有一點需要注意,傳給 try_join! 的所有 Future 都必須擁有相同的錯誤型別,如果錯誤型別不同,可以考慮使用來自 futures::future::TryFutureExt 模塊的 map_errerr_info方法將錯誤進行轉換:

use futures::{
    future::TryFutureExt,
    try_join,
};

async fn get_book() -> Result<Book, ()> { /* ... */ Ok(Book) }
async fn get_music() -> Result<Music, String> { /* ... */ Ok(Music) }

async fn get_book_and_music() -> Result<(Book, Music), String> {
    let book_fut = get_book().map_err(|()| "Unable to get book".to_string());
    let music_fut = get_music();
    try_join!(book_fut, music_fut)
}

6.2 select!

select!

  • futures::select 宏可同時運行多個 future,允許用戶在任意一個 future 完成時進行回應
use futures::{
    future::FutureExt, // for `.fuse()`
    pin_mut,
    select,
};

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

default => ... 和 complete => ...

  • select 支持 default 和 complete 分支
  • default:如果選中的 future 尚未完成,就會運行 default 分支
    • 擁有 default 的 select 總是會立即回傳
  • complete:它用于所有選中的 future 都已完成的情況,
  • complete 分支當所有的 FutureStream 完成后才會被執行,它往往配合loop使用,loop用于回圈完成所有的 Future
  • default分支,若沒有任何 FutureStream 處于 Ready 狀態, 則該分支會被立即執行

例子

use futures::{future, select};

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)   該分支永遠不會運行,因為`Future`會先運行,然后是`complete`
        };
    }
    assert_eq!(total, 10);
}

與 Unpin 和 FusedFuture 互動

  • 前面的例子中,需要在回傳的 future 上呼叫 .fuse(),也呼叫了 pin_mut,
    • 因為 select 里面的 future 必須使用 Unpin 和 FusedFuture 這兩個 trait,
  • 必須 Unpin:select 使用的 future 不是按值的,而是按可變參考,
    • 未完成的 future 在呼叫 select 后 仍可使用
  • 必須 FusedFuture:在 future 完成后,select 不可以對它進行 poll
    • 實作 FusedFuture 的 future 會追蹤其完成狀態,這樣在 select 回圈里,就只會 poll 沒有完成的 future

Stream 也有 FusedStream trait

首先,.fuse()方法可以讓 Future 實作 FusedFuture 特征, 而 pin_mut! 宏會為 Future 實作 Unpin特征,這兩個特征恰恰是使用 select 所必須的:

  • Unpin,由于 select 不會通過拿走所有權的方式使用Future,而是通過可變參考的方式去使用,這樣當 select 結束后,該 Future 若沒有被完成,它的所有權還可以繼續被其它代碼使用,
  • FusedFuture的原因跟上面類似,當 Future 一旦完成后,那 select 就不能再對其進行輪詢使用,Fuse意味著熔斷,相當于 Future 一旦完成,再次呼叫poll會直接回傳Poll::Pending

只有實作了FusedFutureselect 才能配合 loop 一起使用,假如沒有實作,就算一個 Future 已經完成了,它依然會被 select 不停的輪詢執行,

Stream 稍有不同,它們使用的特征是 FusedStream, 通過.fuse()(也可以手動實作)實作了該特征的 Stream,對其呼叫.next().try_next()方法可以獲取實作了FusedFuture特征的Future:

use futures::{
    stream::{Stream, StreamExt, FusedStream},
    select,
};

async fn add_two_streams(
    mut s1: impl Stream<Item = u8> + FusedStream + Unpin,
    mut s2: impl Stream<Item = u8> + FusedStream + Unpin,
) -> u8 {
    let mut total = 0;

    loop {
        let item = select! {
            x = s1.next() => x,
            x = s2.next() => x,
            complete => break,
        };
        if let Some(next_num) = item {
            total += next_num;
        }
    }

    total
}

select 回圈里使用 Fuse 和 FuturesUnordered

  • Fuse::terminated(),允許構建空的、已完成的 future,后續可以為它填充一個需要運行的 future
    • 適用于在 select 回圈里產生且需要在這運行的任務,這種場景
  • 當同個 future 的多個副本需同時運行時,使用 FuturesUnordered 型別
use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) { /* ... */ }

async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let run_on_new_num_fut = run_on_new_num(starting_num).fuse();
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(run_on_new_num_fut, get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                // 定時器已結束,若`get_new_num_fut`沒有在運行,就創建一個新的
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived -- start a new `run_on_new_num_fut`,
                // dropping the old one.
                // 收到新的數字 -- 創建一個新的`run_on_new_num_fut`并丟棄掉舊的
                run_on_new_num_fut.set(run_on_new_num(new_num).fuse());
            },
            // Run the `run_on_new_num_fut` // 運行 `run_on_new_num_fut`
            () = run_on_new_num_fut => {},
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
            // 若所有任務都完成,直接 `panic`, 原因是 `interval_timer` 應該連續不斷的產生值,而不是結束          //后,執行到 `complete` 分支
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}

當某個 Future 有多個拷貝都需要同時運行時,可以使用 FuturesUnordered 型別,下面的例子跟上個例子大體相似,但是它會將 run_on_new_num_fut 的每一個拷貝都運行到完成,而不是像之前那樣一旦創建新的就終止舊的,

use futures::{
    future::{Fuse, FusedFuture, FutureExt},
    stream::{FusedStream, FuturesUnordered, Stream, StreamExt},
    pin_mut,
    select,
};

async fn get_new_num() -> u8 { /* ... */ 5 }

async fn run_on_new_num(_: u8) -> u8 { /* ... */ 5 }

// 使用從 `get_new_num` 獲取的最新數字 來運行 `run_on_new_num`
//
// 每當計時器結束后,`get_new_num` 就會運行一次,它會立即取消當前正在運行的`run_on_new_num` ,
// 并且使用新回傳的值來替換 
async fn run_loop(
    mut interval_timer: impl Stream<Item = ()> + FusedStream + Unpin,
    starting_num: u8,
) {
    let mut run_on_new_num_futs = FuturesUnordered::new();
    run_on_new_num_futs.push(run_on_new_num(starting_num));
    let get_new_num_fut = Fuse::terminated();
    pin_mut!(get_new_num_fut);
    loop {
        select! {
            () = interval_timer.select_next_some() => {
                // The timer has elapsed. Start a new `get_new_num_fut`
                // if one was not already running.
                // 定時器已結束,若`get_new_num_fut`沒有在運行,就創建一個新的
                if get_new_num_fut.is_terminated() {
                    get_new_num_fut.set(get_new_num().fuse());
                }
            },
            new_num = get_new_num_fut => {
                // A new number has arrived -- start a new `run_on_new_num_fut`.
                // 收到新的數字 -- 創建一個新的`run_on_new_num_fut` (并沒有像之前的例子那樣丟棄掉舊值)
                run_on_new_num_futs.push(run_on_new_num(new_num));
            },
            // Run the `run_on_new_num_futs` and check if any have completed
            // 運行 `run_on_new_num_futs`, 并檢查是否有已經完成的
            res = run_on_new_num_futs.select_next_some() => {
                println!("run_on_new_num_fut returned {:?}", res);
            },
            // panic if everything completed, since the `interval_timer` should
            // keep yielding values indefinitely.
           // 若所有任務都完成,直接 `panic`, 原因是 `interval_timer` 應該連續不斷的產生值,而不是結束
            //后,執行到 `complete` 分支
            complete => panic!("`interval_timer` completed unexpectedly"),
        }
    }
}


七、Workarounds to Know and Love 一些疑難問題的解決辦法

7.1 async 塊中的 ?

  • Async 塊中使用 ? 是比較常見的
  • 但是 async 塊的回傳型別沒有明確說明,這可能會導致編譯器無法推斷 async 塊的錯誤型別
  • 目前沒法給 future 一個型別,也無法指明其型別
  • 臨時解決辦法:使用 “turbofish”運算子,為 async 塊提供成功和錯誤型別
#![allow(unused)]
fn main() {
  struct MyError
  async fn foo() -> Result<(), MyError> {
    Ok(())
  }
  async fn bar() -> Result<(), MyError> {
    Ok(())
  
  }
  
  let fut = async {
    foo().await?;
    bar().await?;
    Ok(())  // 報錯 cannot infer type for type parameter `E` declared on the enum `Result`
  };
}

可以使用 ::< ... > 的方式來增加型別注釋:

#![allow(unused)]
fn main() {
  struct MyError
  async fn foo() -> Result<(), MyError> {
    Ok(())
  }
  async fn bar() -> Result<(), MyError> {
    Ok(())
  
  }
  
  let fut = async {
    foo().await?;
    bar().await?;
    Ok::<(), MyError>(()) // <- note the explicit type annotation here
  };
}

7.2 Send Approximation

  • 有些 async fn 狀態機可安全的跨執行緒發送,有些則不行
  • async future 是否是 Send 的,取決于在 .await 點是否持有非 Send 的型別
  • 當值可能在跨域 .await 點被持有時,編譯器會盡力近似估算,但這種估算在很多地方顯得過于保守
  • 臨時辦法:引入塊作用域,把 non-Send 變數隔離

Rc無法在多執行緒環境使用,原因就在于它并未實作 Send 特征

use std::rc::Rc;

#[derive(Default)]
struct NotSend(Rc<()>);

async fn bar() {}
async fn foo() {
    NotSend::default();
    bar().await;
}

fn require_send(_: impl Send) {}

fn main() {
    require_send(foo());
}

即使上面的 foo 回傳的 FutureSend, 但是在它內部短暫的使用 NotSend 依然是安全的,原因在于它的作用域并沒有影響到 .await,下面來試試宣告一個變數,然后讓 .await的呼叫處于變數的作用域中試試:

async fn foo() {
    let x = NotSend::default();
    bar().await;
}

報錯:future cannot be sent between threads safely

.await在運行時處于 x 的作用域內,.await有可能被執行器調度到另一個執行緒上運行,而Rc并沒有實作Send,

可以將變數宣告在陳述句塊內,當陳述句塊結束時,變數會自動被 drop

async fn foo() {
    {
        let x = NotSend::default();
    }
    bar().await;
}

7.3 Recursion

  • 在內部,async fn 會創建一個狀態機型別,它含有每個被 .awaited 的子 future
    • 這就有點麻煩,因為狀態機需要包含其本身
  • 臨時辦法:引入間接,使用Box,并把 recursive 放入非 async 的函式,它會回傳 .boxed() async 塊
// This function: foo函式:
async fn foo() {
    step_one().await;
    step_two().await;
}
// generates a type like this:  會被編譯成類似下面的型別:
enum Foo {
    First(StepOne),
    Second(StepTwo),
}

// So this function: recursive函式
async fn recursive() {
    recursive().await;
    recursive().await;
}

// generates a type like this:  會生成類似以下的型別
enum Recursive {
    First(Recursive),
    Second(Recursive),
}

這是典型的動態大小型別,它的大小會無限增長,因此編譯器會直接報錯:

error[E0733]: recursion in an `async fn` requires boxing
 --> src/lib.rs:1:22
  |
1 | async fn recursive() {
  |                      ^ an `async fn` cannot invoke itself directly
  |
  = note: a recursive `async fn` must be rewritten to return a boxed future.

recursive 轉變成一個正常的函式,該函式回傳一個使用 Box 包裹的 async 陳述句塊:

use futures::future::{BoxFuture, FutureExt};

fn recursive() -> BoxFuture<'static, ()> {
    async move {
        recursive().await;
        recursive().await;
    }.boxed()
}

7.4 async in Trait

  • 目前 async fn 不可用在 trait 里
  • 臨時解決辦法:使用 async-trait

在目前版本中,我們還無法在特征中定義 async fn 函式,不過大家也不用擔心,目前已經有計劃在未來移除這個限制了,

trait Test {
    async fn test();
}

運行后報錯:

error[E0706]: functions in traits cannot be declared `async`
 --> src/main.rs:5:5
  |
5 |     async fn test();
  |     -----^^^^^^^^^^^
  |     |
  |     `async` because of this
  |
  = note: `async` trait functions are not currently supported
  = note: consider using the `async-trait` crate: https://crates.io/crates/async-trait

好在編譯器給出了提示,讓我們使用 async-trait 解決這個問題:

use async_trait::async_trait;

#[async_trait]
trait Advertisement {
    async fn run(&self);
}

struct Modal;

#[async_trait]
impl Advertisement for Modal {
    async fn run(&self) {
        self.render_fullscreen().await;
        for _ in 0..4u16 {
            remind_user_to_join_mailing_list().await;
        }
        self.hide_for_now().await;
    }
}

struct AutoplayingVideo {
    media_url: String,
}

#[async_trait]
impl Advertisement for AutoplayingVideo {
    async fn run(&self) {
        let stream = connect(&self.media_url).await;
        stream.play().await;

        // 用視頻說服用戶加入我們的郵件串列
        Modal.run().await;
    }
}

不過使用該包并不是免費的,每一次特征中的async函式被呼叫時,都會產生一次堆記憶體分配,對于大多數場景,這個性能開銷都可以接受,但是當函式一秒呼叫幾十萬、幾百萬次時,就得小心這塊兒代碼的性能了!

八、The Async Ecosystem

Rust 沒提供什么

  • Rust 目前只提供撰寫 async 代碼的基本要素,標準庫中尚未提供執行器、任務、反應器、組合器以及低級 I/O future 和 trait
  • 社區提供的 async 生態系統填補了這些空白

Async 運行時

  • Async 運行時是用于執行 async 應用程式的庫
  • 運行時通常將一個反應器與一個或多個執行器捆綁在一起
  • 反應器為異步I/O、行程間通信和計時器等外部事件提供訂閱機制
  • 在 async 運行時中,訂閱者通常是代表低級別 I/O 操作的 future
  • 執行者負責安排和執行任務
    • 它們跟蹤正在運行和暫停的任務,對future進行 poll 直到完成,并在任務能夠取得進展時喚醒任務
    • “執行者”一詞經常與“運行時”互換使用
  • 我們使用“生態系統”一詞來描述一個與兼容 trait 和特性捆綁在一起的運行時

社區提供的 async crates

  • futures crate ,提供了 Stream、Sink、AsyncRead、AsyncWrite 等 trait,以及組合器等工具,這些可能最侄訓成為標準庫的一部分
  • Futures 有自己的執行器,但沒有自己的反應器,因此它不支持 async I/O 或計時器 future 的執行
  • 因此,它不被認為是完整的運行時
  • 常見的選擇是: 與另一個 crate 中的執行器一起使用來自 futures 提供的工具

流行的運行時

  • Tokio:一個流行的 async 生態系統,包含 HTTP、gRPC 和跟蹤框架
  • Async-std:提供標準庫的 async 副本
  • Smol:小型、簡化的 async 運行時,提供可用于包裝 UnixStream 或 TcpListener 等結構的 async trait
  • Fuchsia-async:用于 Fuchsia OS 的執行器

確定生態兼容性

  • 與 async I/O、計時器、行程間通信或任務互動的 async 代碼通常取決于特定的異步執行器或反應器
  • 所有其他 async 代碼,如異步運算式、組合器、同步型別和流,通常與生態系統無關,前提是任何嵌套的 future 也與生態系統無關
  • 在開始一個專案之前,建議研究相關的 async 框架和庫,以確保與您選擇的運行時以及彼此之間的兼容性

單執行緒 VS 多執行緒執行器

  • async 執行器可以是單執行緒或多執行緒的
  • 多執行緒執行器可以在多個任務上同時取得進展,對于有許多任務的作業負載,它可以大大加快執行速度,但在任務之間同步資料通常更昂貴
  • 在單執行緒運行時和多執行緒運行時之間進行選擇時,建議測量應用程式的性能
  • 任務可以在創建它們的執行緒上運行,也可以在單獨的執行緒上運行
  • 異步運行時通常提供將任務生成到單獨執行緒的功能
    • 即使任務在不同的執行緒上執行,它們也應該是非阻塞的
  • 為了在多執行緒執行器上調度任務,它們必須是 Send 的
  • 一些運行時提供了生成非 Send 的任務的函式,確保每個任務都在生成它的執行緒上執行
  • 它們還可以提供將阻塞任務生成到專用執行緒的函式,這對于運行來自其他庫的阻塞同步代碼非常有用

九、Final Project: Building a Concurrent Web Server with Async Rust

構建并發的(單執行緒的)Web Server

  • 使用 async Rust 把 《The Rust Programming Language》(《Rust 權威指南》)一書中第 20 章的例子改為可并發處理請求的 Web Server
server on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base 
? tree
.
├── 404.html
├── Cargo.lock
├── Cargo.toml
├── hello.html
├── src
│   └── main.rs
└── target
    ├── CACHEDIR.TAG
    └── debug
        ├── build
        ├── deps
        │   ├── libserver-4f1db2ad8f9f3102.rmeta
        │   ├── libserver-d3461d12acfa742e.rmeta
        │   ├── server-4f1db2ad8f9f3102.d
        │   └── server-d3461d12acfa742e.d
        ├── examples
        └── incremental
            ├── server-1ufb07a5oidqf
            │   ├── s-glb12pwpr6-1ctgjrw-2k6dj6f8psanp
            │   │   ├── dep-graph.bin
            │   │   ├── query-cache.bin
            │   │   └── work-products.bin
            │   └── s-glb12pwpr6-1ctgjrw.lock
            └── server-3vy5xgpwslzqx
                ├── s-glb12pwpr7-10sbywt-3rbnmvgc9rslx
                │   ├── dep-graph.bin
                │   ├── query-cache.bin
                │   └── work-products.bin
                └── s-glb12pwpr7-10sbywt.lock

12 directories, 18 files

server on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base 
? 

main.rs

use std::fs;
use std::io::prelude::*;
use std::net::TcpListener;
use std::net::TcpStream;

fn main() {
    // Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
    let listener = TcpListener::bind("127.0.0.1:7878").unwrap();

    // Block forever, handling each request that arrives at this IP address 阻塞等待請求的進入
    for stream in listener.incoming() {
        let stream = stream.unwrap();

        handle_connection(stream);
    }
}

fn handle_connection(mut stream: TcpStream) {
    // Read the first 1024 bytes of data from the stream 從連接中順序讀取 1024 位元組資料
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).unwrap();

    let get = b"GET / HTTP/1.1\r\n";

    // Respond with greetings or a 404,
    // depending on the data in the request 處理HTTP協議頭,若不符合則回傳404和對應的`html`檔案
    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    // Write response back to the stream, 將回復內容寫入連接快取中
    // and flush the stream to ensure the response is sent back to the client 使用flush將快取中的內容發送到客戶端
    let response = format!("{status_line}{contents}");
    stream.write_all(response.as_bytes()).unwrap();
    stream.flush().unwrap();
}

hello.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Hello!</h1>
    <p>Hi from Rust</p>
</body>

</html>

404.html

<!DOCTYPE html>
<html lang="en">

<head>
    <meta charset="utf-8">
    <title>Hello!</title>
</head>

<body>
    <h1>Oops!</h1>
    <p>Sorry, I don't know what you're asking for.</p>
</body>

</html>

運行

server on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 5.0s 
? cargo run
    Finished dev [unoptimized + debuginfo] target(s) in 0.00s
     Running `target/debug/server`


![image-20230526102033235](../../../Library/Application Support/typora-user-images/image-20230526102033235.png)

![image-20230526102147894](../../../Library/Application Support/typora-user-images/image-20230526102147894.png)

并發地處理連接

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;


#[async_std::main]
async fn main() {
    // Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move {
        let  tcpstream = tcpstream.unwrap();
        handle_connection(tcpstream).await;
    }) 
    .await;
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        // async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

Cargo .toml

[package]
name = "server"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
async-std = { version = "1.12.0", features = ["attributes"] }
futures = "0.3.28"

異步版本的 TcpListenerlistener.incoming() 實作了 Stream 特征,以上修改有兩個好處:

  • listener.incoming() 不再阻塞
  • 使用 for_each_concurrent 并發地處理從 Stream 獲取的元素

訪問:http://127.0.0.1:7878/ 和 http://127.0.0.1:7878/sleep

使用多執行緒并行處理請求

use async_std::net::TcpListener;
use async_std::net::TcpStream;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::time::Duration;


#[async_std::main]
async fn main() {
    // Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener.incoming().for_each_concurrent(/* limit */ None, |tcpstream| async move {
        let  tcpstream = tcpstream.unwrap();
        // handle_connection(tcpstream).await;
        spawn(handle_connection(tcpstream));
    }) 
    .await;
}

async fn handle_connection(mut stream: TcpStream) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        // async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

Testing the TCP Server 測驗 handle_connection 函式

use async_std::io::{Read, Write};
use async_std::net::TcpListener;
use async_std::prelude::*;
use async_std::task;
use async_std::task::spawn;
use futures::stream::StreamExt;
use std::fs;
use std::marker::Unpin;
use std::time::Duration;

#[async_std::main]
async fn main() {
    // Listen for incoming TCP connections on localhost port 7878 監聽本地埠 7878 ,等待 TCP 連接的建立
    let listener = TcpListener::bind("127.0.0.1:7878").await.unwrap();

    listener
        .incoming()
        .for_each_concurrent(/* limit */ None, |tcpstream| async move {
            let tcpstream = tcpstream.unwrap();
            // handle_connection(tcpstream).await;
            spawn(handle_connection(tcpstream));
        })
        .await;
}

async fn handle_connection(mut stream: impl Read + Write + Unpin) {
    let mut buffer = [0; 1024];
    stream.read(&mut buffer).await.unwrap();

    let get = b"GET / HTTP/1.1\r\n";
    let sleep = b"GET /sleep HTTP/1.1\r\n";

    let (status_line, filename) = if buffer.starts_with(get) {
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else if buffer.starts_with(sleep) {
        // async_std::task::sleep,它僅會讓當前的任務陷入睡眠,然后該任務會讓出執行緒的控制權,這樣執行緒就可以繼續運行其它任務,
        task::sleep(Duration::from_secs(5)).await;
        ("HTTP/1.1 200 OK\r\n\r\n", "hello.html")
    } else {
        ("HTTP/1.1 404 NOT FOUND\r\n\r\n", "404.html")
    };
    let contents = fs::read_to_string(filename).unwrap();

    let response = format!("{status_line}{contents}");
    stream.write(response.as_bytes()).await.unwrap();
    stream.flush().await.unwrap();
}

#[cfg(test)]
mod tests {
    use super::*;
    use futures::io::Error;
    use futures::task::{Context, Poll};

    use std::cmp::min;
    use std::pin::Pin;

    struct MockTcpStream {
        read_data: Vec<u8>,
        write_data: Vec<u8>,
    }

    impl Read for MockTcpStream {
        fn poll_read(
            self: Pin<&mut Self>,
            _: &mut Context,
            buf: &mut [u8],
        ) -> Poll<Result<usize, Error>> {
            let size: usize = min(self.read_data.len(), buf.len());
            buf[..size].copy_from_slice(&self.read_data[..size]);
            Poll::Ready(Ok(size))
        }
    }

    impl Write for MockTcpStream {
        fn poll_write(
            // poll_write 會拷貝輸入資料到mock的 TcpStream 中,當完成后回傳 Poll::Ready
            mut self: Pin<&mut Self>,
            _: &mut Context,
            buf: &[u8],
        ) -> Poll<Result<usize, Error>> {
            self.write_data = https://www.cnblogs.com/QiaoPengjun/archive/2023/05/26/Vec::from(buf);

            Poll::Ready(Ok(buf.len()))
        }

        fn poll_flush(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
            Poll::Ready(Ok(()))
        }

        fn poll_close(self: Pin<&mut Self>, _: &mut Context) -> Poll> {
            Poll::Ready(Ok(()))
        }
    }

    use std::marker::Unpin;
    impl Unpin for MockTcpStream {}

    use std::fs;

    #[async_std::test]
    async fn test_handle_connection() {
        let input_bytes = b"GET / HTTP/1.1\r\n";
        let mut contents = vec![0u8; 1024];
        contents[..input_bytes.len()].clone_from_slice(input_bytes);
        let mut stream = MockTcpStream {
            read_data: contents,
            write_data: Vec::new(),
        };

        handle_connection(&mut stream).await;
        let mut buf = [0u8; 1024];
        stream.read(&mut buf).await.unwrap();

        let expected_contents = fs::read_to_string("hello.html").unwrap();
        let expected_response = format!("HTTP/1.1 200 OK\r\n\r\n{}", expected_contents);
        assert!(stream.write_data.starts_with(expected_response.as_bytes()));
    }
}

運行

server on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base took 53m 39.4s 
? cargo test
   Compiling server v0.1.0 (/Users/qiaopengjun/rust/server)
    Finished test [unoptimized + debuginfo] target(s) in 0.64s
     Running unittests src/main.rs (target/debug/deps/server-c69496e95f46c228)

running 1 test
test tests::test_handle_connection ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; 0 measured; 0 filtered out; finished in 0.00s


server on  master [?] is ?? 0.1.0 via ?? 1.67.1 via ?? base 
? 

本文來自博客園,作者:QIAOPENGJUN,轉載請注明原文鏈接:https://www.cnblogs.com/QiaoPengjun/p/17434443.html

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/553516.html

標籤:其他

上一篇:【華為機試】單詞倒敘

下一篇:返回列表

標籤雲
其他(159771) Python(38173) JavaScript(25456) Java(18138) C(15231) 區塊鏈(8268) C#(7972) AI(7469) 爪哇(7425) MySQL(7213) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5873) 数组(5741) R(5409) Linux(5343) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4576) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2434) ASP.NET(2403) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) .NET技术(1977) 功能(1967) Web開發(1951) HtmlCss(1948) C++(1922) python-3.x(1918) 弹簧靴(1913) xml(1889) PostgreSQL(1878) .NETCore(1862) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust async 編程

    # Rust async 編程 Asynchronous Programming in Rust: 中文書名《Rust 異步編程指南》: Rust語言圣經(Rust Course): ## 一、[Getting Started](https://rust-lang.github.io/async-b ......

    uj5u.com 2023-05-26 18:37:19 more
  • 【華為機試】單詞倒敘

    - 題目描述: 輸入單行英文句子,里面包含英文字母,空格以及,.?三種標點符號,請將句子內每個單詞進行倒序,并輸出倒序后的陳述句 - 輸入描述: 輸入字串 S, S 的長度 1≤N≤100 - 輸出描述: 輸出逆序后的字串。 - 解題思路: 遍歷給定句子,判斷如果字母,則插入到指定位置,如果是指定 ......

    uj5u.com 2023-05-26 18:36:39 more
  • 01.初識Python

    > 本教程計劃通過100天的時間,每天分享一篇關于python的知識點,與大家一起學習python這門編程語言。 Python 對初學者來說是一門很棒的語言: - 容易學 - 有一個積極的支持社區 - 在網路開發、游戲、資料科學方面提供多種機會。 ## Python的應用領域 目前Python在We ......

    uj5u.com 2023-05-26 18:23:04 more
  • Netty實戰(四)

    本節我們看看Netty的傳輸(全是干貨,自帶水杯 # 一、Java的NIO和OIO 流經網路的資料總是具有相同的型別:位元組。這些位元組是如何流動的主要取決于我們所說的網路傳輸。 ## 1.1 OIO 我們先來看一段Java的阻塞應用程式程式: ```java package com.example.j ......

    uj5u.com 2023-05-26 18:12:13 more
  • Python工具箱系列(三十三)

    Timescaledb 在物聯網時代,出現了大量以時間為中心海量產生的傳感器資料,稱為時序資料。這類資料的特點是: 資料記錄總有一個時間戳。 資料幾乎總是追加,不更新也不洗掉。 大量使用近期的資料。很少更新或者回填時間間隔的缺失資料。 與時間間隔頻率關系不大。但累積的資料量大,可能會有峰值。 對這類 ......

    uj5u.com 2023-05-26 18:11:53 more
  • 【python基礎】撰寫/運行hello world專案

    # 1.撰寫hello world專案 編程界每種語言的第一個程式往往都是輸出hello world。因此我們來看看,如何用Python輸出hello world。 1.如果你是初學者,main.py中的代碼暫時是無法看懂的,所以可以把main中的源代碼直接洗掉。如下所示 ![image](http ......

    uj5u.com 2023-05-26 17:58:40 more
  • Java設計模式-策略模式

    # 簡介 在軟體開發中,設計模式是為了解決常見問題而提供的一套可重用的解決方案。策略模式(Strategy Pattern)是其中一種常見的設計模式,它屬于行為型模式。該模式的核心思想是將不同的演算法封裝成獨立的策略類,使得它們可以相互替換,而不影響客戶端的使用。 策略模式與其他設計模式有一些明顯的區 ......

    uj5u.com 2023-05-26 17:46:25 more
  • Fastjson 很快,但不適合我....

    作者:nyingping\ 來源:juejin.cn/post/7215886869199863869 > 記者:大爺您有什么特長呀? > > FastJson:我很快。 > > 記者:23423 乘以 4534 等于多少? > > FastJson:等于 2343. > > 記者:?? > > F ......

    uj5u.com 2023-05-26 17:45:46 more
  • JavaWeb編程面試題——Spring Web MVC

    面試題==知識點,這里所記錄的面試題并不針對于面試者,而是將這些面試題作為技能知識點來看待。不以刷題進大廠為目的,而是以學習為目的。這里的知識點會持續更新,目錄也會隨時進行調整。 ......

    uj5u.com 2023-05-26 17:44:55 more
  • 聊聊「短信」渠道的設計與實作

    短信在實作的邏輯上,也遵循訊息中心的基礎設計,即訊息生產之后,通過訊息中心進行投遞和消費,屬于典型的生產消費模型; ......

    uj5u.com 2023-05-26 17:44:17 more