Nhường Quyền Kiểm Soát cho Runtime
Nhớ lại từ phần "Our First Async Program" rằng tại mỗi điểm await, Rust cho phép runtime có cơ hội tạm dừng tác vụ và chuyển sang một tác vụ khác nếu future đang được await chưa sẵn sàng. Điều ngược lại cũng đúng: Rust chỉ tạm dừng các async block và trả quyền kiểm soát lại cho runtime tại một điểm await. Mọi thứ giữa các điểm await là đồng bộ.
Điều đó có nghĩa là nếu bạn thực hiện một loạt công việc trong một async block mà không có điểm await, future đó sẽ chặn bất kỳ future nào khác thực hiện tiến trình. Đôi khi bạn có thể nghe điều này được gọi là một future đói các future khác. Trong một số trường hợp, điều đó có thể không phải là vấn đề lớn. Tuy nhiên, nếu bạn đang thực hiện một số loại thiết lập đắt tiền hoặc công việc chạy dài, hoặc nếu bạn có một future sẽ tiếp tục thực hiện một số tác vụ cụ thể vô thời hạn, bạn sẽ cần suy nghĩ về khi nào và ở đâu để bàn giao quyền kiểm soát cho runtime.
Hãy mô phỏng một hoạt động chạy dài để minh họa vấn đề đói, sau đó khám phá cách
giải quyết nó. Listing 17-14 giới thiệu một hàm slow.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::block_on(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Đây chắc chắn là một cải tiến so với việc chuyển đổi giữa join và join3 và
join4 và v.v.! Tuy nhiên, ngay cả dạng macro này cũng chỉ hoạt động khi chúng
ta biết số lượng future trước. Trong Rust thực tế, việc đưa các future vào một
tập hợp và sau đó đợi một số hoặc tất cả các future hoàn thành là một mẫu phổ
biến.
Để kiểm tra tất cả các future trong một tập hợp, chúng ta cần lặp qua và kết hợp
tất cả chúng. Hàm trpl::join_all chấp nhận bất kỳ kiểu nào thực thi trait
Iterator, mà bạn đã học trong [The Iterator Trait and the next
Method][iterator-trait] Chương 13, vì vậy nó có vẻ như đúng là
thứ chúng ta cần. Hãy thử đặt các future của chúng ta vào một vector và thay thế
join! bằng join_all như trong Listing 17-15.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
slow("a", 10);
slow("a", 20);
trpl::sleep(Duration::from_millis(50)).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
slow("b", 10);
slow("b", 15);
slow("b", 350);
trpl::sleep(Duration::from_millis(50)).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Thật không may, mã này không biên dịch được. Thay vào đó, chúng ta nhận được lỗi này:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
Điều này có thể gây ngạc nhiên. Rốt cuộc, không có async block nào trả về bất cứ
thứ gì, nên mỗi block tạo ra một Future<Output = ()>. Hãy nhớ rằng Future là
một trait, tuy nhiên, và trình biên dịch tạo ra một enum duy nhất cho mỗi async
block. Bạn không thể đặt hai struct viết tay khác nhau trong một Vec, và quy
tắc tương tự áp dụng cho các enum khác nhau được tạo ra bởi trình biên dịch.
Để làm cho điều này hoạt động, chúng ta cần sử dụng trait objects, giống như
chúng ta đã làm trong ["Returning Errors from the run
function"][dyn] ở Chương 12. (Chúng ta sẽ đề cập chi tiết về các
trait object trong Chương 18.) Sử dụng trait object cho phép chúng ta xem xét
mỗi future ẩn danh được tạo ra bởi các kiểu này là cùng một kiểu, bởi vì tất cả
chúng đều thực thi trait Future.
Lưu ý: Trong [Using an Enum to Store Multiple Values][enum-alt] ở Chương 8, chúng ta đã thảo luận về một cách khác để bao gồm nhiều kiểu trong một
Vec: sử dụng một enum để đại diện cho mỗi kiểu có thể xuất hiện trong vector. Chúng ta không thể làm điều đó ở đây. Một mặt, chúng ta không có cách để đặt tên cho các kiểu khác nhau, bởi vì chúng là ẩn danh. Mặt khác, lý do chúng ta chọn vector vàjoin_allngay từ đầu là để có thể làm việc với một tập hợp động của các future mà chúng ta chỉ quan tâm rằng chúng có cùng kiểu đầu ra.
Chúng ta bắt đầu bằng cách bọc mỗi future trong vec! trong một Box::new, như
trong Listing 17-16.
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let one_ms = Duration::from_millis(1);
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::sleep(one_ms).await;
slow("a", 10);
trpl::sleep(one_ms).await;
slow("a", 20);
trpl::sleep(one_ms).await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::sleep(one_ms).await;
slow("b", 10);
trpl::sleep(one_ms).await;
slow("b", 15);
trpl::sleep(one_ms).await;
slow("b", 350);
trpl::sleep(one_ms).await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Thật không may, mã này vẫn không biên dịch. Thực tế, chúng ta nhận được cùng lỗi
cơ bản mà chúng ta đã nhận trước đó cho cả hai lệnh gọi Box::new thứ hai và
thứ ba, cũng như các lỗi mới đề cập đến trait Unpin. Chúng ta sẽ quay lại các
lỗi Unpin trong giây lát. Trước tiên, hãy sửa các lỗi kiểu trên các lệnh gọi
Box::new bằng cách chú thích rõ ràng kiểu của biến futures (xem Listing
17-17).
extern crate trpl; // required for mdbook test
use std::{thread, time::Duration};
fn main() {
trpl::block_on(async {
let a = async {
println!("'a' started.");
slow("a", 30);
trpl::yield_now().await;
slow("a", 10);
trpl::yield_now().await;
slow("a", 20);
trpl::yield_now().await;
println!("'a' finished.");
};
let b = async {
println!("'b' started.");
slow("b", 75);
trpl::yield_now().await;
slow("b", 10);
trpl::yield_now().await;
slow("b", 15);
trpl::yield_now().await;
slow("b", 350);
trpl::yield_now().await;
println!("'b' finished.");
};
trpl::select(a, b).await;
});
}
fn slow(name: &str, ms: u64) {
thread::sleep(Duration::from_millis(ms));
println!("'{name}' ran for {ms}ms");
}
Khai báo kiểu này hơi phức tạp, vì vậy hãy cùng đi qua nó:
- Kiểu bên trong cùng là future. Chúng ta lưu ý rõ ràng rằng đầu ra của future
là kiểu đơn vị
()bằng cách viếtFuture<Output = ()>. - Sau đó, chúng ta chú thích trait với
dynđể đánh dấu nó là động. - Toàn bộ tham chiếu trait được bọc trong một
Box. - Cuối cùng, chúng ta nêu rõ rằng
futureslà mộtVecchứa những mục này.
Điều đó đã tạo ra một sự khác biệt lớn. Bây giờ khi chúng ta chạy trình biên
dịch, chúng ta chỉ nhận được các lỗi đề cập đến Unpin. Mặc dù có ba lỗi, nhưng
nội dung của chúng rất giống nhau.
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:24
|
49 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:9
|
49 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:33
|
49 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors
Đó là rất nhiều thông tin để tiêu hóa, vì vậy hãy phân tích nó. Phần đầu tiên
của thông báo cho chúng ta biết rằng async block đầu tiên
(src/main.rs:8:23: 20:10) không thực thi trait Unpin và đề xuất sử dụng
pin! hoặc Box::pin để giải quyết vấn đề. Về sau trong chương, chúng ta sẽ đi
sâu vào một vài chi tiết khác về Pin và Unpin. Tuy nhiên, hiện tại, chúng ta
có thể làm theo lời khuyên của trình biên dịch để giải quyết vấn đề. Trong
Listing 17-18, chúng ta bắt đầu bằng cách import Pin từ std::pin. Tiếp theo,
chúng ta cập nhật chú thích kiểu cho futures, với một Pin bao bọc mỗi Box.
Cuối cùng, chúng ta sử dụng Box::pin để cố định các future.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::block_on(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); }
Nếu chúng ta biên dịch và chạy mã này, cuối cùng chúng ta sẽ nhận được kết quả mong muốn:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
Xong rồi!
Có một chút điều cần khám phá thêm ở đây. Một mặt, việc sử dụng Pin<Box<T>>
thêm một lượng nhỏ chi phí từ việc đưa các future này lên heap với Box—và
chúng ta chỉ làm điều đó để làm cho các kiểu phù hợp. Chúng ta thực sự không
cần cấp phát trên heap, sau tất cả: các future này là cục bộ cho hàm cụ thể
này. Như đã lưu ý trước đó, Pin tự nó là một kiểu bao bọc, vì vậy chúng ta có
thể nhận được lợi ích của việc có một kiểu duy nhất trong Vec—lý do ban đầu
chúng ta chọn Box—mà không cần cấp phát heap. Chúng ta có thể sử dụng Pin
trực tiếp với mỗi future, sử dụng macro std::pin::pin.
Tuy nhiên, chúng ta vẫn phải rõ ràng về kiểu của tham chiếu được ghim; nếu
không, Rust vẫn không biết cách diễn giải chúng như các đối tượng trait động, đó
là những gì chúng ta cần chúng trở thành trong Vec. Do đó, chúng ta thêm pin
vào danh sách import của chúng ta từ std::pin. Sau đó, chúng ta có thể pin!
mỗi future khi chúng ta định nghĩa nó và định nghĩa futures là một Vec chứa
các tham chiếu có thể thay đổi được ghim đến kiểu future động, như trong Listing
17-19.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::block_on(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { // Here is where our implementation will go! }
Chúng ta đã đi đến đây bằng cách bỏ qua thực tế rằng chúng ta có thể có các kiểu
Output khác nhau. Ví dụ, trong Listing 17-20, future ẩn danh cho a thực thi
Future<Output = u32>, future ẩn danh cho b thực thi Future<Output = &str>,
và future ẩn danh cho c thực thi Future<Output = bool>.
extern crate trpl; // required for mdbook test use std::time::Duration; use trpl::Either; // --snip-- fn main() { trpl::block_on(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::select(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Chúng ta có thể sử dụng trpl::join! để đợi chúng, bởi vì nó cho phép chúng ta
truyền vào nhiều kiểu future và tạo ra một tuple của các kiểu đó. Chúng ta
không thể sử dụng trpl::join_all, bởi vì nó yêu cầu tất cả các future được
truyền vào phải có cùng kiểu. Hãy nhớ rằng, lỗi đó là những gì khiến chúng ta
bắt đầu cuộc phiêu lưu này với Pin!
Đây là một sự đánh đổi cơ bản: chúng ta có thể xử lý một số lượng động của các
future với join_all, miễn là tất cả chúng đều có cùng kiểu, hoặc chúng ta có
thể xử lý một tập hợp cố định số lượng future với các hàm join hoặc macro
join!, ngay cả khi chúng có các kiểu khác nhau. Đây là cùng một kịch bản chúng
ta gặp phải khi làm việc với bất kỳ kiểu nào khác trong Rust. Future không có gì
đặc biệt, mặc dù chúng ta có một số cú pháp tốt để làm việc với chúng, và đó là
một điều tốt.
Đua Các Future
Khi chúng ta "join" các future với họ hàm join và các macro, chúng ta yêu cầu
tất cả chúng phải hoàn thành trước khi chúng ta tiếp tục. Đôi khi, tuy nhiên,
chúng ta chỉ cần một số future từ một tập hợp hoàn thành trước khi chúng ta
tiếp tục—hơi giống với việc đua một future với một future khác.
Trong Listing 17-21, chúng ta một lần nữa sử dụng trpl::race để chạy hai
future, slow và fast, đua nhau.
extern crate trpl; // required for mdbook test fn main() { trpl::block_on(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
Mỗi future in một thông báo khi nó bắt đầu chạy, tạm dừng một khoảng thời gian
bằng cách gọi và đợi sleep, và sau đó in một thông báo khác khi nó hoàn thành.
Sau đó, chúng ta truyền cả slow và fast cho trpl::race và đợi một trong số
chúng hoàn thành. (Kết quả ở đây không quá đáng ngạc nhiên: fast thắng.) Không
giống như khi chúng ta sử dụng race trở lại trong "Our First Async
Program", chúng ta chỉ bỏ qua thể hiện Either mà nó trả về ở đây, bởi vì tất
cả các hành vi thú vị xảy ra trong phần thân của các async block.
Chú ý rằng nếu bạn đảo ngược thứ tự các đối số cho race, thứ tự của các thông
báo "started" sẽ thay đổi, mặc dù future fast luôn hoàn thành trước. Đó là vì
việc thực thi của hàm race cụ thể này là không công bằng. Nó luôn chạy các
future được truyền vào làm đối số theo thứ tự chúng được truyền vào. Các cách
thực thi khác là công bằng và sẽ chọn ngẫu nhiên future nào được poll đầu
tiên. Bất kể việc thực thi race mà chúng ta đang sử dụng có công bằng hay không,
một trong các future sẽ chạy đến điểm await đầu tiên trong phần thân của nó
trước khi một tác vụ khác có thể bắt đầu.
Nhớ lại từ Our First Async Program rằng tại mỗi điểm await, Rust cho phép runtime có cơ hội tạm dừng tác vụ và chuyển sang một tác vụ khác nếu future đang được await chưa sẵn sàng. Điều ngược lại cũng đúng: Rust chỉ tạm dừng các async block và trả quyền kiểm soát lại cho runtime tại một điểm await. Mọi thứ giữa các điểm await là đồng bộ.
Điều đó có nghĩa là nếu bạn thực hiện một loạt công việc trong một async block mà không có điểm await, future đó sẽ chặn bất kỳ future nào khác thực hiện tiến trình. Đôi khi bạn có thể nghe điều này được gọi là một future đói các future khác. Trong một số trường hợp, điều đó có thể không phải là vấn đề lớn. Tuy nhiên, nếu bạn đang thực hiện một số loại thiết lập đắt tiền hoặc công việc chạy dài, hoặc nếu bạn có một future sẽ tiếp tục thực hiện một số tác vụ cụ thể vô thời hạn, bạn sẽ cần suy nghĩ về khi nào và ở đâu để bàn giao quyền kiểm soát cho runtime.
Cũng vậy, nếu bạn có các hoạt động chặn chạy dài, async có thể là một công cụ hữu ích để cung cấp các cách cho các phần khác nhau của chương trình liên quan với nhau.
Nhưng làm thế nào bạn sẽ bàn giao quyền kiểm soát cho runtime trong những trường hợp đó?
Nhường Quyền Kiểm Soát cho Runtime
Hãy mô phỏng một hoạt động chạy dài. Listing 17-22 giới thiệu một hàm slow.
extern crate trpl; // required for mdbook test use trpl::StreamExt; fn main() { trpl::block_on(async { let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]; // --snip-- let iter = values.iter().map(|n| n * 2); let mut stream = trpl::stream_from_iter(iter); while let Some(value) = stream.next().await { println!("The value was: {value}"); } }); }
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::block_on(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Mã này sử dụng std::thread::sleep thay vì trpl::sleep để khi gọi slow sẽ
chặn thread hiện tại trong một số mili giây. Chúng ta có thể sử dụng slow để
đại diện cho các hoạt động trong thế giới thực vừa chạy lâu vừa chặn.
Trong Listing 17-15, chúng ta sử dụng slow để mô phỏng việc thực hiện loại
công việc gắn với CPU này trong một cặp future.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::block_on(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::select(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Để bắt đầu, mỗi future chỉ trả lại quyền kiểm soát cho runtime sau khi thực hiện một loạt các hoạt động chậm. Nếu bạn chạy mã này, bạn sẽ thấy kết quả này:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
Mỗi future trả lại quyền kiểm soát cho runtime chỉ sau khi thực hiện một loạt các hoạt động chậm. Nếu bạn chạy mã này, bạn sẽ thấy kết quả này:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
Giống như ví dụ Listing 17-5 khi chúng ta sử dụng trpl::select để đua các
future lấy hai URL, select vẫn kết thúc ngay khi a hoàn thành. Không có sự
xen kẽ giữa các lệnh gọi slow trong hai future. Future a thực hiện tất cả
công việc của nó cho đến khi lệnh gọi trpl::sleep được await, sau đó future
b thực hiện tất cả công việc của nó cho đến khi lệnh gọi trpl::sleep của
riêng nó được await, và cuối cùng future a hoàn thành. Để cho phép cả hai
future thực hiện tiến trình giữa các tác vụ chậm của chúng, chúng ta cần các
điểm await để chúng ta có thể bàn giao quyền kiểm soát cho runtime. Điều đó có
nghĩa là chúng ta cần thứ gì đó mà chúng ta có thể await!
Chúng ta đã có thể thấy loại bàn giao này xảy ra trong Listing 17-15: nếu chúng
ta loại bỏ trpl::sleep ở cuối future a, nó sẽ hoàn thành mà không có future
b chạy chút nào. Hãy thử sử dụng hàm trpl::sleep làm điểm khởi đầu để cho
phép các hoạt động luân phiên thực hiện tiến trình, như được hiển thị trong
Listing 17-16.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::block_on(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 350); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::select(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Chúng ta đã thêm các lệnh gọi trpl::sleep với các điểm await giữa mỗi lệnh gọi
đến slow. Bây giờ công việc của hai future được xen kẽ:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
Future a vẫn chạy một chút trước khi bàn giao quyền kiểm soát cho b, bởi vì
nó gọi slow trước khi từng gọi trpl::sleep, nhưng sau đó các future hoán đổi
qua lại mỗi khi một trong số chúng đạt đến một điểm await. Trong trường hợp này,
chúng ta đã làm điều đó sau mỗi lệnh gọi đến slow, nhưng chúng ta có thể chia
nhỏ công việc theo bất kỳ cách nào hợp lý nhất đối với chúng ta.
Tuy nhiên, chúng ta không thực sự muốn ngủ ở đây: chúng ta muốn thực hiện tiến
trình nhanh nhất có thể. Chúng ta chỉ cần trả lại quyền kiểm soát cho runtime.
Chúng ta có thể làm điều đó trực tiếp, sử dụng hàm trpl::yield_now. Trong
Listing 17-17, chúng ta thay thế tất cả những lệnh gọi trpl::sleep bằng
trpl::yield_now.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::block_on(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 350); trpl::yield_now().await; println!("'b' finished."); }; trpl::select(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Mã này vừa rõ ràng hơn về ý định thực tế vừa có thể nhanh hơn đáng kể so với
việc sử dụng sleep, bởi vì các bộ đếm thời gian như bộ được sử dụng bởi
sleep thường có giới hạn về mức độ chi tiết mà chúng có thể có. Phiên bản
sleep mà chúng ta đang sử dụng, ví dụ, sẽ luôn ngủ ít nhất một mili giây, ngay
cả khi chúng ta truyền cho nó một Duration một nano giây. Một lần nữa, máy
tính hiện đại nhanh: chúng có thể làm rất nhiều trong một mili giây!
Điều này có nghĩa là async có thể hữu ích ngay cả cho các tác vụ gắn với tính toán, tùy thuộc vào những gì chương trình của bạn đang làm, bởi vì nó cung cấp một công cụ hữu ích để cấu trúc các mối quan hệ giữa các phần khác nhau của chương trình (nhưng với chi phí của overhead từ state machine async). Đây là một hình thức đa nhiệm hợp tác, trong đó mỗi future có quyền xác định khi nào nó bàn giao quyền kiểm soát thông qua các điểm await. Mỗi future do đó cũng có trách nhiệm tránh chặn quá lâu. Trong một số hệ điều hành nhúng dựa trên Rust, đây là loại duy nhất của đa nhiệm!
Trong mã thực tế, bạn thường sẽ không thay đổi các lệnh gọi hàm với các điểm await trên mỗi dòng, tất nhiên. Mặc dù nhường quyền kiểm soát theo cách này là tương đối không tốn kém, nhưng nó không miễn phí. Trong nhiều trường hợp, việc cố gắng chia nhỏ một tác vụ gắn với tính toán có thể làm cho nó chậm hơn đáng kể, vì vậy đôi khi tốt hơn cho hiệu suất tổng thể là để một hoạt động chặn ngắn. Luôn đo lường để xem thực sự các nút thắt cổ chai hiệu suất của mã của bạn là gì. Động lực cơ bản là điều quan trọng cần ghi nhớ, tuy nhiên, nếu bạn đang thấy nhiều công việc xảy ra tuần tự mà bạn mong đợi xảy ra đồng thời!
Xây Dựng Các Sự Trừu Tượng Async Của Riêng Chúng Ta
Chúng ta cũng có thể kết hợp các future lại với nhau để tạo ra các mẫu mới. Ví
dụ, chúng ta có thể xây dựng một hàm timeout với các khối xây dựng async mà
chúng ta đã có. Khi chúng ta hoàn thành, kết quả sẽ là một khối xây dựng khác mà
chúng ta có thể sử dụng để tạo ra thêm các sự trừu tượng async.
Listing 17-18 hiển thị cách chúng ta mong đợi timeout này hoạt động với một
future chậm.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
Hãy triển khai điều này! Để bắt đầu, hãy suy nghĩ về API cho timeout:
- Nó cần phải là một hàm async để chúng ta có thể await nó.
- Tham số đầu tiên của nó phải là một future để chạy. Chúng ta có thể làm cho nó generic để cho phép nó hoạt động với bất kỳ future nào.
- Tham số thứ hai của nó sẽ là thời gian tối đa để đợi. Nếu chúng ta sử dụng một
Duration, điều đó sẽ làm cho nó dễ dàng để truyền chotrpl::sleep. - Nó sẽ trả về một
Result. Nếu future hoàn thành thành công,Resultsẽ làOkvới giá trị được tạo ra bởi future. Nếu timeout hết hạn trước,Resultsẽ làErrvới duration mà timeout đã đợi.
Listing 17-19 hiển thị khai báo này.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::block_on(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_secs(2)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
Điều đó đáp ứng các mục tiêu của chúng ta cho các kiểu. Bây giờ hãy suy nghĩ về
hành vi mà chúng ta cần: chúng ta muốn đua future được truyền vào với
duration. Chúng ta có thể sử dụng trpl::sleep để tạo một future bộ đếm thời
gian từ duration, và sử dụng trpl::select để chạy bộ đếm thời gian đó với
future mà người gọi truyền vào.
Trong Listing 17-20, chúng ta triển khai timeout bằng cách match trên kết quả
của việc await trpl::select.
extern crate trpl; // required for mdbook test use std::time::Duration; use trpl::Either; // --snip-- fn main() { trpl::block_on(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::select(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Việc triển khai trpl::select không công bằng: nó luôn poll các đối số theo thứ
tự mà chúng được truyền vào (các triển khai select khác sẽ chọn ngẫu nhiên đối
số nào được poll đầu tiên). Do đó, chúng ta truyền future_to_try cho select
trước để nó có cơ hội hoàn thành ngay cả khi max_time là một duration rất
ngắn. Nếu future_to_try hoàn thành trước, select sẽ trả về Left với đầu ra
từ future_to_try. Nếu timer hoàn thành trước, select sẽ trả về Right với
đầu ra của timer là ().
Nếu future_to_try thành công và chúng ta nhận được Left(output), chúng ta
trả về Ok(output). Nếu bộ đếm thời gian ngủ hết hạn thay vào đó và chúng ta
nhận được Right(()), chúng ta bỏ qua () với _ và trả về Err(max_time)
thay thế.
Với điều đó, chúng ta có một timeout hoạt động được xây dựng từ hai trợ giúp
async khác. Nếu chúng ta chạy mã của mình, nó sẽ in chế độ thất bại sau timeout:
Failed after 2 seconds
Bởi vì future kết hợp với các future khác, bạn có thể xây dựng các công cụ thực sự mạnh mẽ sử dụng các khối xây dựng async nhỏ hơn. Ví dụ, bạn có thể sử dụng cùng cách tiếp cận này để kết hợp timeout với thử lại, và đến lượt sử dụng chúng với các hoạt động như các cuộc gọi mạng (như những cuộc gọi trong Listing 17-5).
Trong thực tế, bạn thường sẽ làm việc trực tiếp với async và await, và thứ
hai là với các hàm như select và macro như join! để kiểm soát cách các
future bên ngoài được thực thi.
Bây giờ chúng ta đã thấy một số cách để làm việc với nhiều future cùng một lúc. Tiếp theo, chúng ta sẽ xem cách làm việc với nhiều future trong một chuỗi theo thời gian với streams.