Làm việc với Bất kỳ Số lượng Future nào

Khi chúng ta chuyển từ việc sử dụng hai future sang ba future trong phần trước, chúng ta cũng phải chuyển từ việc sử dụng join sang sử dụng join3. Sẽ rất khó chịu nếu phải gọi một hàm khác mỗi khi chúng ta thay đổi số lượng future muốn kết hợp. May mắn thay, chúng ta có dạng macro của join mà chúng ta có thể truyền vào một số lượng tham số tùy ý. Nó cũng tự xử lý việc await các future. Vì vậy, chúng ta có thể viết lại mã từ Listing 17-13 để sử dụng join! thay vì join3, như trong Listing 17-14.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        trpl::join!(tx1_fut, tx_fut, rx_fut);
    });
}

Đây chắc chắn là một cải tiến so với việc chuyển đổi giữa joinjoin3join4 và v.v.! Tuy nhiên, ngay cả dạng macro này cũng chỉ hoạt động khi chúng ta biết số lượng future trước. Trong Rust thực tế, việc đưa các future vào một tập hợp và sau đó đợi một số hoặc tất cả các future hoàn thành là một mẫu phổ biến.

Để kiểm tra tất cả các future trong một tập hợp, chúng ta cần lặp qua và kết hợp tất cả chúng. Hàm trpl::join_all chấp nhận bất kỳ kiểu nào thực thi trait Iterator, mà bạn đã học trong The Iterator Trait and the next Method Chương 13, vì vậy nó có vẻ như đúng là thứ chúng ta cần. Hãy thử đặt các future của chúng ta vào một vector và thay thế join! bằng join_all như trong Listing 17-15.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures = vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}

Thật không may, mã này không biên dịch được. Thay vào đó, chúng ta nhận được lỗi này:

error[E0308]: mismatched types
  --> src/main.rs:45:37
   |
10 |         let tx1_fut = async move {
   |                       ---------- the expected `async` block
...
24 |         let rx_fut = async {
   |                      ----- the found `async` block
...
45 |         let futures = vec![tx1_fut, rx_fut, tx_fut];
   |                                     ^^^^^^ expected `async` block, found a different `async` block
   |
   = note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
              found `async` block `{async block@src/main.rs:24:22: 24:27}`
   = note: no two async blocks, even if identical, have the same type
   = help: consider pinning your async block and casting it to a trait object

Điều này có thể gây ngạc nhiên. Rốt cuộc, không có async block nào trả về bất cứ thứ gì, nên mỗi block tạo ra một Future<Output = ()>. Hãy nhớ rằng Future là một trait, tuy nhiên, và trình biên dịch tạo ra một enum duy nhất cho mỗi async block. Bạn không thể đặt hai struct viết tay khác nhau trong một Vec, và quy tắc tương tự áp dụng cho các enum khác nhau được tạo ra bởi trình biên dịch.

Để làm cho điều này hoạt động, chúng ta cần sử dụng trait objects, giống như chúng ta đã làm trong "Returning Errors from the run function" ở Chương 12. (Chúng ta sẽ đề cập chi tiết về các trait object trong Chương 18.) Sử dụng trait object cho phép chúng ta xem xét mỗi future ẩn danh được tạo ra bởi các kiểu này là cùng một kiểu, bởi vì tất cả chúng đều thực thi trait Future.

Lưu ý: Trong Using an Enum to Store Multiple Values ở Chương 8, chúng ta đã thảo luận về một cách khác để bao gồm nhiều kiểu trong một Vec: sử dụng một enum để đại diện cho mỗi kiểu có thể xuất hiện trong vector. Chúng ta không thể làm điều đó ở đây. Một mặt, chúng ta không có cách để đặt tên cho các kiểu khác nhau, bởi vì chúng là ẩn danh. Mặt khác, lý do chúng ta chọn vector và join_all ngay từ đầu là để có thể làm việc với một tập hợp động của các future mà chúng ta chỉ quan tâm rằng chúng có cùng kiểu đầu ra.

Chúng ta bắt đầu bằng cách bọc mỗi future trong vec! trong một Box::new, như trong Listing 17-16.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}

Thật không may, mã này vẫn không biên dịch. Thực tế, chúng ta nhận được cùng lỗi cơ bản mà chúng ta đã nhận trước đó cho cả hai lệnh gọi Box::new thứ hai và thứ ba, cũng như các lỗi mới đề cập đến trait Unpin. Chúng ta sẽ quay lại các lỗi Unpin trong giây lát. Trước tiên, hãy sửa các lỗi kiểu trên các lệnh gọi Box::new bằng cách chú thích rõ ràng kiểu của biến futures (xem Listing 17-17).

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Box<dyn Future<Output = ()>>> =
            vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];

        trpl::join_all(futures).await;
    });
}

Khai báo kiểu này hơi phức tạp, vì vậy hãy cùng đi qua nó:

  1. Kiểu bên trong cùng là future. Chúng ta lưu ý rõ ràng rằng đầu ra của future là kiểu đơn vị () bằng cách viết Future<Output = ()>.
  2. Sau đó, chúng ta chú thích trait với dyn để đánh dấu nó là động.
  3. Toàn bộ tham chiếu trait được bọc trong một Box.
  4. Cuối cùng, chúng ta nêu rõ rằng futures là một Vec chứa những mục này.

Điều đó đã tạo ra một sự khác biệt lớn. Bây giờ khi chúng ta chạy trình biên dịch, chúng ta chỉ nhận được các lỗi đề cập đến Unpin. Mặc dù có ba lỗi, nhưng nội dung của chúng rất giống nhau.

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
   --> src/main.rs:49:24
    |
49  |         trpl::join_all(futures).await;
    |         -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
    |         |
    |         required by a bound introduced by this call
    |
    = note: consider using the `pin!` macro
            consider using `Box::pin` if you need to access the pinned value outside of the current scope
    = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
   --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
    |
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
    |        -------- required by a bound in this function
...
105 |     I::Item: Future,
    |              ^^^^^^ required by this bound in `join_all`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:9
   |
49 |         trpl::join_all(futures).await;
   |         ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
  --> src/main.rs:49:33
   |
49 |         trpl::join_all(futures).await;
   |                                 ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
   |
   = note: consider using the `pin!` macro
           consider using `Box::pin` if you need to access the pinned value outside of the current scope
   = note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
  --> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
   |
27 | pub struct JoinAll<F>
   |            ------- required by a bound in this struct
28 | where
29 |     F: Future,
   |        ^^^^^^ required by this bound in `JoinAll`

For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors

Đó là rất nhiều thông tin để tiêu hóa, vì vậy hãy phân tích nó. Phần đầu tiên của thông báo cho chúng ta biết rằng async block đầu tiên (src/main.rs:8:23: 20:10) không thực thi trait Unpin và đề xuất sử dụng pin! hoặc Box::pin để giải quyết vấn đề. Về sau trong chương, chúng ta sẽ đi sâu vào một vài chi tiết khác về PinUnpin. Tuy nhiên, hiện tại, chúng ta có thể làm theo lời khuyên của trình biên dịch để giải quyết vấn đề. Trong Listing 17-18, chúng ta bắt đầu bằng cách import Pin từ std::pin. Tiếp theo, chúng ta cập nhật chú thích kiểu cho futures, với một Pin bao bọc mỗi Box. Cuối cùng, chúng ta sử dụng Box::pin để cố định các future.

extern crate trpl; // required for mdbook test

use std::pin::Pin;

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        };

        let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> =
            vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)];

        trpl::join_all(futures).await;
    });
}

Nếu chúng ta biên dịch và chạy mã này, cuối cùng chúng ta sẽ nhận được kết quả mong muốn:

received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'

Xong rồi!

Có một chút điều cần khám phá thêm ở đây. Một mặt, việc sử dụng Pin<Box<T>> thêm một lượng nhỏ chi phí từ việc đưa các future này lên heap với Box—và chúng ta chỉ làm điều đó để làm cho các kiểu phù hợp. Chúng ta thực sự không cần cấp phát trên heap, sau tất cả: các future này là cục bộ cho hàm cụ thể này. Như đã lưu ý trước đó, Pin tự nó là một kiểu bao bọc, vì vậy chúng ta có thể nhận được lợi ích của việc có một kiểu duy nhất trong Vec—lý do ban đầu chúng ta chọn Box—mà không cần cấp phát heap. Chúng ta có thể sử dụng Pin trực tiếp với mỗi future, sử dụng macro std::pin::pin.

Tuy nhiên, chúng ta vẫn phải rõ ràng về kiểu của tham chiếu được ghim; nếu không, Rust vẫn không biết cách diễn giải chúng như các đối tượng trait động, đó là những gì chúng ta cần chúng trở thành trong Vec. Do đó, chúng ta thêm pin vào danh sách import của chúng ta từ std::pin. Sau đó, chúng ta có thể pin! mỗi future khi chúng ta định nghĩa nó và định nghĩa futures là một Vec chứa các tham chiếu có thể thay đổi được ghim đến kiểu future động, như trong Listing 17-19.

extern crate trpl; // required for mdbook test

use std::pin::{Pin, pin};

// -- snip --

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let rx_fut = pin!(async {
            // --snip--
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        });

        let tx_fut = pin!(async move {
            // --snip--
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_secs(1)).await;
            }
        });

        let futures: Vec<Pin<&mut dyn Future<Output = ()>>> =
            vec![tx1_fut, rx_fut, tx_fut];

        trpl::join_all(futures).await;
    });
}

Chúng ta đã đi đến đây bằng cách bỏ qua thực tế rằng chúng ta có thể có các kiểu Output khác nhau. Ví dụ, trong Listing 17-20, future ẩn danh cho a thực thi Future<Output = u32>, future ẩn danh cho b thực thi Future<Output = &str>, và future ẩn danh cho c thực thi Future<Output = bool>.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let a = async { 1u32 };
        let b = async { "Hello!" };
        let c = async { true };

        let (a_result, b_result, c_result) = trpl::join!(a, b, c);
        println!("{a_result}, {b_result}, {c_result}");
    });
}

Chúng ta có thể sử dụng trpl::join! để đợi chúng, bởi vì nó cho phép chúng ta truyền vào nhiều kiểu future và tạo ra một tuple của các kiểu đó. Chúng ta không thể sử dụng trpl::join_all, bởi vì nó yêu cầu tất cả các future được truyền vào phải có cùng kiểu. Hãy nhớ rằng, lỗi đó là những gì khiến chúng ta bắt đầu cuộc phiêu lưu này với Pin!

Đây là một sự đánh đổi cơ bản: chúng ta có thể xử lý một số lượng động của các future với join_all, miễn là tất cả chúng đều có cùng kiểu, hoặc chúng ta có thể xử lý một tập hợp cố định số lượng future với các hàm join hoặc macro join!, ngay cả khi chúng có các kiểu khác nhau. Đây là cùng một kịch bản chúng ta gặp phải khi làm việc với bất kỳ kiểu nào khác trong Rust. Future không có gì đặc biệt, mặc dù chúng ta có một số cú pháp tốt để làm việc với chúng, và đó là một điều tốt.

Đua Các Future

Khi chúng ta "join" các future với họ hàm join và các macro, chúng ta yêu cầu tất cả chúng phải hoàn thành trước khi chúng ta tiếp tục. Đôi khi, tuy nhiên, chúng ta chỉ cần một số future từ một tập hợp hoàn thành trước khi chúng ta tiếp tục—hơi giống với việc đua một future với một future khác.

Trong Listing 17-21, chúng ta một lần nữa sử dụng trpl::race để chạy hai future, slowfast, đua nhau.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            println!("'slow' started.");
            trpl::sleep(Duration::from_millis(100)).await;
            println!("'slow' finished.");
        };

        let fast = async {
            println!("'fast' started.");
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'fast' finished.");
        };

        trpl::race(slow, fast).await;
    });
}

Mỗi future in một thông báo khi nó bắt đầu chạy, tạm dừng một khoảng thời gian bằng cách gọi và đợi sleep, và sau đó in một thông báo khác khi nó hoàn thành. Sau đó, chúng ta truyền cả slowfast cho trpl::race và đợi một trong số chúng hoàn thành. (Kết quả ở đây không quá đáng ngạc nhiên: fast thắng.) Không giống như khi chúng ta sử dụng race trở lại trong "Our First Async Program", chúng ta chỉ bỏ qua thể hiện Either mà nó trả về ở đây, bởi vì tất cả các hành vi thú vị xảy ra trong phần thân của các async block.

Chú ý rằng nếu bạn đảo ngược thứ tự các đối số cho race, thứ tự của các thông báo "started" sẽ thay đổi, mặc dù future fast luôn hoàn thành trước. Đó là vì việc thực thi của hàm race cụ thể này là không công bằng. Nó luôn chạy các future được truyền vào làm đối số theo thứ tự chúng được truyền vào. Các cách thực thi khác công bằng và sẽ chọn ngẫu nhiên future nào được poll đầu tiên. Bất kể việc thực thi race mà chúng ta đang sử dụng có công bằng hay không, một trong các future sẽ chạy đến điểm await đầu tiên trong phần thân của nó trước khi một tác vụ khác có thể bắt đầu.

Nhớ lại từ Our First Async Program rằng tại mỗi điểm await, Rust cho phép runtime có cơ hội tạm dừng tác vụ và chuyển sang một tác vụ khác nếu future đang được await chưa sẵn sàng. Điều ngược lại cũng đúng: Rust chỉ tạm dừng các async block và trả quyền kiểm soát lại cho runtime tại một điểm await. Mọi thứ giữa các điểm await là đồng bộ.

Điều đó có nghĩa là nếu bạn thực hiện một loạt công việc trong một async block mà không có điểm await, future đó sẽ chặn bất kỳ future nào khác thực hiện tiến trình. Đôi khi bạn có thể nghe điều này được gọi là một future đói các future khác. Trong một số trường hợp, điều đó có thể không phải là vấn đề lớn. Tuy nhiên, nếu bạn đang thực hiện một số loại thiết lập đắt tiền hoặc công việc chạy dài, hoặc nếu bạn có một future sẽ tiếp tục thực hiện một số tác vụ cụ thể vô thời hạn, bạn sẽ cần suy nghĩ về khi nào và ở đâu để bàn giao quyền kiểm soát cho runtime.

Cũng vậy, nếu bạn có các hoạt động chặn chạy dài, async có thể là một công cụ hữu ích để cung cấp các cách cho các phần khác nhau của chương trình liên quan với nhau.

Nhưng làm thế nào bạn sẽ bàn giao quyền kiểm soát cho runtime trong những trường hợp đó?

Nhường Quyền Kiểm Soát cho Runtime

Hãy mô phỏng một hoạt động chạy dài. Listing 17-22 giới thiệu một hàm slow.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        // We will call `slow` here later
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Mã này sử dụng std::thread::sleep thay vì trpl::sleep để khi gọi slow sẽ chặn thread hiện tại trong một số mili giây. Chúng ta có thể sử dụng slow để đại diện cho các hoạt động trong thế giới thực vừa chạy lâu vừa chặn.

Trong Listing 17-23, chúng ta sử dụng slow để mô phỏng việc thực hiện loại công việc gắn với CPU này trong một cặp future.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            slow("a", 10);
            slow("a", 20);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            slow("b", 10);
            slow("b", 15);
            slow("b", 350);
            trpl::sleep(Duration::from_millis(50)).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Để bắt đầu, mỗi future chỉ trả lại quyền kiểm soát cho runtime sau khi thực hiện một loạt các hoạt động chậm. Nếu bạn chạy mã này, bạn sẽ thấy kết quả này:

'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.

Giống như ví dụ trước đó của chúng ta, race vẫn kết thúc ngay khi a hoàn thành. Không có sự xen kẽ giữa hai future, mặc dù. Future a thực hiện tất cả công việc của nó cho đến khi lệnh gọi trpl::sleep được await, sau đó future b thực hiện tất cả công việc của nó cho đến khi lệnh gọi trpl::sleep của riêng nó được await, và cuối cùng future a hoàn thành. Để cho phép cả hai future thực hiện tiến trình giữa các tác vụ chậm của chúng, chúng ta cần các điểm await để chúng ta có thể bàn giao quyền kiểm soát cho runtime. Điều đó có nghĩa là chúng ta cần thứ gì đó mà chúng ta có thể await!

Chúng ta đã có thể thấy loại bàn giao này xảy ra trong Listing 17-23: nếu chúng ta loại bỏ trpl::sleep ở cuối future a, nó sẽ hoàn thành mà không có future b chạy chút nào. Hãy thử sử dụng hàm sleep làm điểm khởi đầu để cho phép các hoạt động luân phiên thực hiện tiến trình, như được hiển thị trong Listing 17-24.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let one_ms = Duration::from_millis(1);

        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::sleep(one_ms).await;
            slow("a", 10);
            trpl::sleep(one_ms).await;
            slow("a", 20);
            trpl::sleep(one_ms).await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::sleep(one_ms).await;
            slow("b", 10);
            trpl::sleep(one_ms).await;
            slow("b", 15);
            trpl::sleep(one_ms).await;
            slow("b", 350);
            trpl::sleep(one_ms).await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Trong Listing 17-24, chúng ta thêm các lệnh gọi trpl::sleep với các điểm await giữa mỗi lệnh gọi đến slow. Bây giờ công việc của hai future được xen kẽ:

'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.

Future a vẫn chạy một chút trước khi bàn giao quyền kiểm soát cho b, bởi vì nó gọi slow trước khi từng gọi trpl::sleep, nhưng sau đó các future hoán đổi qua lại mỗi khi một trong số chúng đạt đến một điểm await. Trong trường hợp này, chúng ta đã làm điều đó sau mỗi lệnh gọi đến slow, nhưng chúng ta có thể chia nhỏ công việc theo bất kỳ cách nào hợp lý nhất đối với chúng ta.

Tuy nhiên, chúng ta không thực sự muốn ngủ ở đây: chúng ta muốn thực hiện tiến trình nhanh nhất có thể. Chúng ta chỉ cần trả lại quyền kiểm soát cho runtime. Chúng ta có thể làm điều đó trực tiếp, sử dụng hàm yield_now. Trong Listing 17-25, chúng ta thay thế tất cả những lệnh gọi sleep bằng yield_now.

extern crate trpl; // required for mdbook test

use std::{thread, time::Duration};

fn main() {
    trpl::run(async {
        let a = async {
            println!("'a' started.");
            slow("a", 30);
            trpl::yield_now().await;
            slow("a", 10);
            trpl::yield_now().await;
            slow("a", 20);
            trpl::yield_now().await;
            println!("'a' finished.");
        };

        let b = async {
            println!("'b' started.");
            slow("b", 75);
            trpl::yield_now().await;
            slow("b", 10);
            trpl::yield_now().await;
            slow("b", 15);
            trpl::yield_now().await;
            slow("b", 350);
            trpl::yield_now().await;
            println!("'b' finished.");
        };

        trpl::race(a, b).await;
    });
}

fn slow(name: &str, ms: u64) {
    thread::sleep(Duration::from_millis(ms));
    println!("'{name}' ran for {ms}ms");
}

Mã này vừa rõ ràng hơn về ý định thực tế vừa có thể nhanh hơn đáng kể so với việc sử dụng sleep, bởi vì các bộ đếm thời gian như bộ được sử dụng bởi sleep thường có giới hạn về mức độ chi tiết mà chúng có thể có. Phiên bản sleep mà chúng ta đang sử dụng, ví dụ, sẽ luôn ngủ ít nhất một mili giây, ngay cả khi chúng ta truyền cho nó một Duration một nano giây. Một lần nữa, máy tính hiện đại nhanh: chúng có thể làm rất nhiều trong một mili giây!

Bạn có thể tự mình thấy điều này bằng cách thiết lập một benchmark nhỏ, chẳng hạn như bench trong Listing 17-26. (Đây không phải là một cách đặc biệt nghiêm ngặt để thực hiện kiểm tra hiệu suất, nhưng nó đủ để hiển thị sự khác biệt ở đây.)

extern crate trpl; // required for mdbook test

use std::time::{Duration, Instant};

fn main() {
    trpl::run(async {
        let one_ns = Duration::from_nanos(1);
        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::sleep(one_ns).await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'sleep' version finished after {} seconds.",
            time.as_secs_f32()
        );

        let start = Instant::now();
        async {
            for _ in 1..1000 {
                trpl::yield_now().await;
            }
        }
        .await;
        let time = Instant::now() - start;
        println!(
            "'yield' version finished after {} seconds.",
            time.as_secs_f32()
        );
    });
}

Ở đây, chúng ta bỏ qua tất cả việc in trạng thái, truyền một Duration một nano giây cho trpl::sleep, và để mỗi future tự chạy, không có sự chuyển đổi giữa các future. Sau đó, chúng ta chạy 1.000 lần lặp lại và xem future sử dụng trpl::sleep mất bao lâu so với future sử dụng trpl::yield_now.

Phiên bản với yield_now nhanh hơn nhiều!

Điều này có nghĩa là async có thể hữu ích ngay cả cho các tác vụ gắn với tính toán, tùy thuộc vào những gì chương trình của bạn đang làm, bởi vì nó cung cấp một công cụ hữu ích để cấu trúc các mối quan hệ giữa các phần khác nhau của chương trình. Đây là một hình thức đa nhiệm hợp tác, trong đó mỗi future có quyền xác định khi nào nó bàn giao quyền kiểm soát thông qua các điểm await. Mỗi future do đó cũng có trách nhiệm tránh chặn quá lâu. Trong một số hệ điều hành nhúng dựa trên Rust, đây là loại duy nhất của đa nhiệm!

Trong mã thực tế, bạn thường sẽ không thay đổi các lệnh gọi hàm với các điểm await trên mỗi dòng, tất nhiên. Mặc dù nhường quyền kiểm soát theo cách này là tương đối không tốn kém, nhưng nó không miễn phí. Trong nhiều trường hợp, việc cố gắng chia nhỏ một tác vụ gắn với tính toán có thể làm cho nó chậm hơn đáng kể, vì vậy đôi khi tốt hơn cho hiệu suất tổng thể là để một hoạt động chặn ngắn. Luôn đo lường để xem thực sự các nút thắt cổ chai hiệu suất của mã của bạn là gì. Động lực cơ bản là điều quan trọng cần ghi nhớ, tuy nhiên, nếu bạn đang thấy nhiều công việc xảy ra tuần tự mà bạn mong đợi xảy ra đồng thời!

Xây Dựng Các Sự Trừu Tượng Async Của Riêng Chúng Ta

Chúng ta cũng có thể kết hợp các future lại với nhau để tạo ra các mẫu mới. Ví dụ, chúng ta có thể xây dựng một hàm timeout với các khối xây dựng async mà chúng ta đã có. Khi chúng ta hoàn thành, kết quả sẽ là một khối xây dựng khác mà chúng ta có thể sử dụng để tạo ra thêm các sự trừu tượng async.

Listing 17-27 hiển thị cách chúng ta mong đợi timeout này hoạt động với một future chậm.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_millis(100)).await;
            "I finished!"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

Hãy triển khai điều này! Để bắt đầu, hãy suy nghĩ về API cho timeout:

  • Nó cần phải là một hàm async để chúng ta có thể await nó.
  • Tham số đầu tiên của nó phải là một future để chạy. Chúng ta có thể làm cho nó generic để cho phép nó hoạt động với bất kỳ future nào.
  • Tham số thứ hai của nó sẽ là thời gian tối đa để đợi. Nếu chúng ta sử dụng một Duration, điều đó sẽ làm cho nó dễ dàng để truyền cho trpl::sleep.
  • Nó sẽ trả về một Result. Nếu future hoàn thành thành công, Result sẽ là Ok với giá trị được tạo ra bởi future. Nếu timeout hết hạn trước, Result sẽ là Err với duration mà timeout đã đợi.

Listing 17-28 hiển thị khai báo này.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_millis(10)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    // Here is where our implementation will go!
}

Điều đó đáp ứng các mục tiêu của chúng ta cho các kiểu. Bây giờ hãy suy nghĩ về hành vi mà chúng ta cần: chúng ta muốn đua future được truyền vào với duration. Chúng ta có thể sử dụng trpl::sleep để tạo một future bộ đếm thời gian từ duration, và sử dụng trpl::race để chạy bộ đếm thời gian đó với future mà người gọi truyền vào.

Chúng ta cũng biết rằng race không công bằng, polling các đối số theo thứ tự mà chúng được truyền. Do đó, chúng ta truyền future_to_try cho race trước để nó có cơ hội hoàn thành ngay cả khi max_time là một duration rất ngắn. Nếu future_to_try hoàn thành trước, race sẽ trả về Left với đầu ra từ future_to_try. Nếu timer hoàn thành trước, race sẽ trả về Right với đầu ra của timer là ().

Trong Listing 17-29, chúng ta match trên kết quả của việc await trpl::race.

extern crate trpl; // required for mdbook test

use std::time::Duration;

use trpl::Either;

// --snip--

fn main() {
    trpl::run(async {
        let slow = async {
            trpl::sleep(Duration::from_secs(5)).await;
            "Finally finished"
        };

        match timeout(slow, Duration::from_secs(2)).await {
            Ok(message) => println!("Succeeded with '{message}'"),
            Err(duration) => {
                println!("Failed after {} seconds", duration.as_secs())
            }
        }
    });
}

async fn timeout<F: Future>(
    future_to_try: F,
    max_time: Duration,
) -> Result<F::Output, Duration> {
    match trpl::race(future_to_try, trpl::sleep(max_time)).await {
        Either::Left(output) => Ok(output),
        Either::Right(_) => Err(max_time),
    }
}

Nếu future_to_try thành công và chúng ta nhận được Left(output), chúng ta trả về Ok(output). Nếu bộ đếm thời gian ngủ hết hạn thay vào đó và chúng ta nhận được Right(()), chúng ta bỏ qua () với _ và trả về Err(max_time) thay thế.

Với điều đó, chúng ta có một timeout hoạt động được xây dựng từ hai trợ giúp async khác. Nếu chúng ta chạy mã của mình, nó sẽ in chế độ thất bại sau timeout:

Failed after 2 seconds

Bởi vì future kết hợp với các future khác, bạn có thể xây dựng các công cụ thực sự mạnh mẽ sử dụng các khối xây dựng async nhỏ hơn. Ví dụ, bạn có thể sử dụng cùng cách tiếp cận này để kết hợp timeout với thử lại, và đến lượt sử dụng chúng với các hoạt động như các cuộc gọi mạng (một trong các ví dụ từ đầu chương).

Trong thực tế, bạn thường sẽ làm việc trực tiếp với asyncawait, và thứ hai là với các hàm và macro như join, join_all, race, và v.v. Bạn sẽ chỉ cần sử dụng pin thỉnh thoảng để sử dụng future với những API đó.

Bây giờ chúng ta đã thấy một số cách để làm việc với nhiều future cùng một lúc. Tiếp theo, chúng ta sẽ xem cách làm việc với nhiều future trong một chuỗi theo thời gian với streams. Dưới đây là một vài điều khác bạn có thể muốn xem xét trước, tuy nhiên:

  • Chúng ta đã sử dụng một Vec với join_all để đợi tất cả các future trong một nhóm hoàn thành. Làm thế nào bạn có thể sử dụng một Vec để xử lý một nhóm future tuần tự thay vào đó? Sự đánh đổi của việc làm điều đó là gì?

  • Hãy xem kiểu futures::stream::FuturesUnordered từ crate futures. Sử dụng nó sẽ khác với sử dụng một Vec như thế nào? (Đừng lo lắng về việc nó đến từ phần stream của crate; nó hoạt động hoàn toàn tốt với bất kỳ tập hợp future nào.)