Làm việc với Bất kỳ Số lượng Future nào
Khi chúng ta chuyển từ việc sử dụng hai future sang ba future trong phần trước,
chúng ta cũng phải chuyển từ việc sử dụng join
sang sử dụng join3
. Sẽ rất
khó chịu nếu phải gọi một hàm khác mỗi khi chúng ta thay đổi số lượng future
muốn kết hợp. May mắn thay, chúng ta có dạng macro của join
mà chúng ta có thể
truyền vào một số lượng tham số tùy ý. Nó cũng tự xử lý việc await các future.
Vì vậy, chúng ta có thể viết lại mã từ Listing 17-13 để sử dụng join!
thay vì
join3
, như trong Listing 17-14.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; trpl::join!(tx1_fut, tx_fut, rx_fut); }); }
Đây chắc chắn là một cải tiến so với việc chuyển đổi giữa join
và join3
và
join4
và v.v.! Tuy nhiên, ngay cả dạng macro này cũng chỉ hoạt động khi chúng
ta biết số lượng future trước. Trong Rust thực tế, việc đưa các future vào một
tập hợp và sau đó đợi một số hoặc tất cả các future hoàn thành là một mẫu phổ
biến.
Để kiểm tra tất cả các future trong một tập hợp, chúng ta cần lặp qua và kết hợp
tất cả chúng. Hàm trpl::join_all
chấp nhận bất kỳ kiểu nào thực thi trait
Iterator
, mà bạn đã học trong The Iterator Trait and the next
Method Chương 13, vì vậy nó có vẻ như đúng là
thứ chúng ta cần. Hãy thử đặt các future của chúng ta vào một vector và thay thế
join!
bằng join_all
như trong Listing 17-15.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures = vec![tx1_fut, rx_fut, tx_fut];
trpl::join_all(futures).await;
});
}
Thật không may, mã này không biên dịch được. Thay vào đó, chúng ta nhận được lỗi này:
error[E0308]: mismatched types
--> src/main.rs:45:37
|
10 | let tx1_fut = async move {
| ---------- the expected `async` block
...
24 | let rx_fut = async {
| ----- the found `async` block
...
45 | let futures = vec![tx1_fut, rx_fut, tx_fut];
| ^^^^^^ expected `async` block, found a different `async` block
|
= note: expected `async` block `{async block@src/main.rs:10:23: 10:33}`
found `async` block `{async block@src/main.rs:24:22: 24:27}`
= note: no two async blocks, even if identical, have the same type
= help: consider pinning your async block and casting it to a trait object
Điều này có thể gây ngạc nhiên. Rốt cuộc, không có async block nào trả về bất cứ
thứ gì, nên mỗi block tạo ra một Future<Output = ()>
. Hãy nhớ rằng Future
là
một trait, tuy nhiên, và trình biên dịch tạo ra một enum duy nhất cho mỗi async
block. Bạn không thể đặt hai struct viết tay khác nhau trong một Vec
, và quy
tắc tương tự áp dụng cho các enum khác nhau được tạo ra bởi trình biên dịch.
Để làm cho điều này hoạt động, chúng ta cần sử dụng trait objects, giống như
chúng ta đã làm trong "Returning Errors from the run
function" ở Chương 12. (Chúng ta sẽ đề cập chi tiết về các
trait object trong Chương 18.) Sử dụng trait object cho phép chúng ta xem xét
mỗi future ẩn danh được tạo ra bởi các kiểu này là cùng một kiểu, bởi vì tất cả
chúng đều thực thi trait Future
.
Lưu ý: Trong Using an Enum to Store Multiple Values ở Chương 8, chúng ta đã thảo luận về một cách khác để bao gồm nhiều kiểu trong một
Vec
: sử dụng một enum để đại diện cho mỗi kiểu có thể xuất hiện trong vector. Chúng ta không thể làm điều đó ở đây. Một mặt, chúng ta không có cách để đặt tên cho các kiểu khác nhau, bởi vì chúng là ẩn danh. Mặt khác, lý do chúng ta chọn vector vàjoin_all
ngay từ đầu là để có thể làm việc với một tập hợp động của các future mà chúng ta chỉ quan tâm rằng chúng có cùng kiểu đầu ra.
Chúng ta bắt đầu bằng cách bọc mỗi future trong vec!
trong một Box::new
, như
trong Listing 17-16.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Thật không may, mã này vẫn không biên dịch. Thực tế, chúng ta nhận được cùng lỗi
cơ bản mà chúng ta đã nhận trước đó cho cả hai lệnh gọi Box::new
thứ hai và
thứ ba, cũng như các lỗi mới đề cập đến trait Unpin
. Chúng ta sẽ quay lại các
lỗi Unpin
trong giây lát. Trước tiên, hãy sửa các lỗi kiểu trên các lệnh gọi
Box::new
bằng cách chú thích rõ ràng kiểu của biến futures
(xem Listing
17-17).
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let (tx, mut rx) = trpl::channel();
let tx1 = tx.clone();
let tx1_fut = async move {
let vals = vec![
String::from("hi"),
String::from("from"),
String::from("the"),
String::from("future"),
];
for val in vals {
tx1.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let rx_fut = async {
while let Some(value) = rx.recv().await {
println!("received '{value}'");
}
};
let tx_fut = async move {
let vals = vec![
String::from("more"),
String::from("messages"),
String::from("for"),
String::from("you"),
];
for val in vals {
tx.send(val).unwrap();
trpl::sleep(Duration::from_secs(1)).await;
}
};
let futures: Vec<Box<dyn Future<Output = ()>>> =
vec![Box::new(tx1_fut), Box::new(rx_fut), Box::new(tx_fut)];
trpl::join_all(futures).await;
});
}
Khai báo kiểu này hơi phức tạp, vì vậy hãy cùng đi qua nó:
- Kiểu bên trong cùng là future. Chúng ta lưu ý rõ ràng rằng đầu ra của future
là kiểu đơn vị
()
bằng cách viếtFuture<Output = ()>
. - Sau đó, chúng ta chú thích trait với
dyn
để đánh dấu nó là động. - Toàn bộ tham chiếu trait được bọc trong một
Box
. - Cuối cùng, chúng ta nêu rõ rằng
futures
là mộtVec
chứa những mục này.
Điều đó đã tạo ra một sự khác biệt lớn. Bây giờ khi chúng ta chạy trình biên
dịch, chúng ta chỉ nhận được các lỗi đề cập đến Unpin
. Mặc dù có ba lỗi, nhưng
nội dung của chúng rất giống nhau.
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:24
|
49 | trpl::join_all(futures).await;
| -------------- ^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
| |
| required by a bound introduced by this call
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `join_all`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:105:14
|
102 | pub fn join_all<I>(iter: I) -> JoinAll<I::Item>
| -------- required by a bound in this function
...
105 | I::Item: Future,
| ^^^^^^ required by this bound in `join_all`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:9
|
49 | trpl::join_all(futures).await;
| ^^^^^^^^^^^^^^^^^^^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
error[E0277]: `dyn Future<Output = ()>` cannot be unpinned
--> src/main.rs:49:33
|
49 | trpl::join_all(futures).await;
| ^^^^^ the trait `Unpin` is not implemented for `dyn Future<Output = ()>`
|
= note: consider using the `pin!` macro
consider using `Box::pin` if you need to access the pinned value outside of the current scope
= note: required for `Box<dyn Future<Output = ()>>` to implement `Future`
note: required by a bound in `futures_util::future::join_all::JoinAll`
--> file:///home/.cargo/registry/src/index.crates.io-1949cf8c6b5b557f/futures-util-0.3.30/src/future/join_all.rs:29:8
|
27 | pub struct JoinAll<F>
| ------- required by a bound in this struct
28 | where
29 | F: Future,
| ^^^^^^ required by this bound in `JoinAll`
For more information about this error, try `rustc --explain E0277`.
error: could not compile `async_await` (bin "async_await") due to 3 previous errors
Đó là rất nhiều thông tin để tiêu hóa, vì vậy hãy phân tích nó. Phần đầu tiên
của thông báo cho chúng ta biết rằng async block đầu tiên
(src/main.rs:8:23: 20:10
) không thực thi trait Unpin
và đề xuất sử dụng
pin!
hoặc Box::pin
để giải quyết vấn đề. Về sau trong chương, chúng ta sẽ đi
sâu vào một vài chi tiết khác về Pin
và Unpin
. Tuy nhiên, hiện tại, chúng ta
có thể làm theo lời khuyên của trình biên dịch để giải quyết vấn đề. Trong
Listing 17-18, chúng ta bắt đầu bằng cách import Pin
từ std::pin
. Tiếp theo,
chúng ta cập nhật chú thích kiểu cho futures
, với một Pin
bao bọc mỗi Box
.
Cuối cùng, chúng ta sử dụng Box::pin
để cố định các future.
extern crate trpl; // required for mdbook test use std::pin::Pin; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = async move { let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let rx_fut = async { while let Some(value) = rx.recv().await { println!("received '{value}'"); } }; let tx_fut = async move { let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }; let futures: Vec<Pin<Box<dyn Future<Output = ()>>>> = vec![Box::pin(tx1_fut), Box::pin(rx_fut), Box::pin(tx_fut)]; trpl::join_all(futures).await; }); }
Nếu chúng ta biên dịch và chạy mã này, cuối cùng chúng ta sẽ nhận được kết quả mong muốn:
received 'hi'
received 'more'
received 'from'
received 'messages'
received 'the'
received 'for'
received 'future'
received 'you'
Xong rồi!
Có một chút điều cần khám phá thêm ở đây. Một mặt, việc sử dụng Pin<Box<T>>
thêm một lượng nhỏ chi phí từ việc đưa các future này lên heap với Box
—và
chúng ta chỉ làm điều đó để làm cho các kiểu phù hợp. Chúng ta thực sự không
cần cấp phát trên heap, sau tất cả: các future này là cục bộ cho hàm cụ thể
này. Như đã lưu ý trước đó, Pin
tự nó là một kiểu bao bọc, vì vậy chúng ta có
thể nhận được lợi ích của việc có một kiểu duy nhất trong Vec
—lý do ban đầu
chúng ta chọn Box
—mà không cần cấp phát heap. Chúng ta có thể sử dụng Pin
trực tiếp với mỗi future, sử dụng macro std::pin::pin
.
Tuy nhiên, chúng ta vẫn phải rõ ràng về kiểu của tham chiếu được ghim; nếu
không, Rust vẫn không biết cách diễn giải chúng như các đối tượng trait động, đó
là những gì chúng ta cần chúng trở thành trong Vec
. Do đó, chúng ta thêm pin
vào danh sách import của chúng ta từ std::pin
. Sau đó, chúng ta có thể pin!
mỗi future khi chúng ta định nghĩa nó và định nghĩa futures
là một Vec
chứa
các tham chiếu có thể thay đổi được ghim đến kiểu future động, như trong Listing
17-19.
extern crate trpl; // required for mdbook test use std::pin::{Pin, pin}; // -- snip -- use std::time::Duration; fn main() { trpl::run(async { let (tx, mut rx) = trpl::channel(); let tx1 = tx.clone(); let tx1_fut = pin!(async move { // --snip-- let vals = vec![ String::from("hi"), String::from("from"), String::from("the"), String::from("future"), ]; for val in vals { tx1.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let rx_fut = pin!(async { // --snip-- while let Some(value) = rx.recv().await { println!("received '{value}'"); } }); let tx_fut = pin!(async move { // --snip-- let vals = vec![ String::from("more"), String::from("messages"), String::from("for"), String::from("you"), ]; for val in vals { tx.send(val).unwrap(); trpl::sleep(Duration::from_secs(1)).await; } }); let futures: Vec<Pin<&mut dyn Future<Output = ()>>> = vec![tx1_fut, rx_fut, tx_fut]; trpl::join_all(futures).await; }); }
Chúng ta đã đi đến đây bằng cách bỏ qua thực tế rằng chúng ta có thể có các kiểu
Output
khác nhau. Ví dụ, trong Listing 17-20, future ẩn danh cho a
thực thi
Future<Output = u32>
, future ẩn danh cho b
thực thi Future<Output = &str>
,
và future ẩn danh cho c
thực thi Future<Output = bool>
.
extern crate trpl; // required for mdbook test fn main() { trpl::run(async { let a = async { 1u32 }; let b = async { "Hello!" }; let c = async { true }; let (a_result, b_result, c_result) = trpl::join!(a, b, c); println!("{a_result}, {b_result}, {c_result}"); }); }
Chúng ta có thể sử dụng trpl::join!
để đợi chúng, bởi vì nó cho phép chúng ta
truyền vào nhiều kiểu future và tạo ra một tuple của các kiểu đó. Chúng ta
không thể sử dụng trpl::join_all
, bởi vì nó yêu cầu tất cả các future được
truyền vào phải có cùng kiểu. Hãy nhớ rằng, lỗi đó là những gì khiến chúng ta
bắt đầu cuộc phiêu lưu này với Pin
!
Đây là một sự đánh đổi cơ bản: chúng ta có thể xử lý một số lượng động của các
future với join_all
, miễn là tất cả chúng đều có cùng kiểu, hoặc chúng ta có
thể xử lý một tập hợp cố định số lượng future với các hàm join
hoặc macro
join!
, ngay cả khi chúng có các kiểu khác nhau. Đây là cùng một kịch bản chúng
ta gặp phải khi làm việc với bất kỳ kiểu nào khác trong Rust. Future không có gì
đặc biệt, mặc dù chúng ta có một số cú pháp tốt để làm việc với chúng, và đó là
một điều tốt.
Đua Các Future
Khi chúng ta "join" các future với họ hàm join
và các macro, chúng ta yêu cầu
tất cả chúng phải hoàn thành trước khi chúng ta tiếp tục. Đôi khi, tuy nhiên,
chúng ta chỉ cần một số future từ một tập hợp hoàn thành trước khi chúng ta
tiếp tục—hơi giống với việc đua một future với một future khác.
Trong Listing 17-21, chúng ta một lần nữa sử dụng trpl::race
để chạy hai
future, slow
và fast
, đua nhau.
extern crate trpl; // required for mdbook test use std::time::Duration; fn main() { trpl::run(async { let slow = async { println!("'slow' started."); trpl::sleep(Duration::from_millis(100)).await; println!("'slow' finished."); }; let fast = async { println!("'fast' started."); trpl::sleep(Duration::from_millis(50)).await; println!("'fast' finished."); }; trpl::race(slow, fast).await; }); }
Mỗi future in một thông báo khi nó bắt đầu chạy, tạm dừng một khoảng thời gian
bằng cách gọi và đợi sleep
, và sau đó in một thông báo khác khi nó hoàn thành.
Sau đó, chúng ta truyền cả slow
và fast
cho trpl::race
và đợi một trong số
chúng hoàn thành. (Kết quả ở đây không quá đáng ngạc nhiên: fast
thắng.) Không
giống như khi chúng ta sử dụng race
trở lại trong "Our First Async
Program", chúng ta chỉ bỏ qua thể hiện Either
mà nó trả về ở đây, bởi vì tất
cả các hành vi thú vị xảy ra trong phần thân của các async block.
Chú ý rằng nếu bạn đảo ngược thứ tự các đối số cho race
, thứ tự của các thông
báo "started" sẽ thay đổi, mặc dù future fast
luôn hoàn thành trước. Đó là vì
việc thực thi của hàm race
cụ thể này là không công bằng. Nó luôn chạy các
future được truyền vào làm đối số theo thứ tự chúng được truyền vào. Các cách
thực thi khác là công bằng và sẽ chọn ngẫu nhiên future nào được poll đầu
tiên. Bất kể việc thực thi race mà chúng ta đang sử dụng có công bằng hay không,
một trong các future sẽ chạy đến điểm await
đầu tiên trong phần thân của nó
trước khi một tác vụ khác có thể bắt đầu.
Nhớ lại từ Our First Async Program rằng tại mỗi điểm await, Rust cho phép runtime có cơ hội tạm dừng tác vụ và chuyển sang một tác vụ khác nếu future đang được await chưa sẵn sàng. Điều ngược lại cũng đúng: Rust chỉ tạm dừng các async block và trả quyền kiểm soát lại cho runtime tại một điểm await. Mọi thứ giữa các điểm await là đồng bộ.
Điều đó có nghĩa là nếu bạn thực hiện một loạt công việc trong một async block mà không có điểm await, future đó sẽ chặn bất kỳ future nào khác thực hiện tiến trình. Đôi khi bạn có thể nghe điều này được gọi là một future đói các future khác. Trong một số trường hợp, điều đó có thể không phải là vấn đề lớn. Tuy nhiên, nếu bạn đang thực hiện một số loại thiết lập đắt tiền hoặc công việc chạy dài, hoặc nếu bạn có một future sẽ tiếp tục thực hiện một số tác vụ cụ thể vô thời hạn, bạn sẽ cần suy nghĩ về khi nào và ở đâu để bàn giao quyền kiểm soát cho runtime.
Cũng vậy, nếu bạn có các hoạt động chặn chạy dài, async có thể là một công cụ hữu ích để cung cấp các cách cho các phần khác nhau của chương trình liên quan với nhau.
Nhưng làm thế nào bạn sẽ bàn giao quyền kiểm soát cho runtime trong những trường hợp đó?
Nhường Quyền Kiểm Soát cho Runtime
Hãy mô phỏng một hoạt động chạy dài. Listing 17-22 giới thiệu một hàm slow
.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { // We will call `slow` here later }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Mã này sử dụng std::thread::sleep
thay vì trpl::sleep
để khi gọi slow
sẽ
chặn thread hiện tại trong một số mili giây. Chúng ta có thể sử dụng slow
để
đại diện cho các hoạt động trong thế giới thực vừa chạy lâu vừa chặn.
Trong Listing 17-23, chúng ta sử dụng slow
để mô phỏng việc thực hiện loại
công việc gắn với CPU này trong một cặp future.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); slow("a", 10); slow("a", 20); trpl::sleep(Duration::from_millis(50)).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); slow("b", 10); slow("b", 15); slow("b", 350); trpl::sleep(Duration::from_millis(50)).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Để bắt đầu, mỗi future chỉ trả lại quyền kiểm soát cho runtime sau khi thực hiện một loạt các hoạt động chậm. Nếu bạn chạy mã này, bạn sẽ thấy kết quả này:
'a' started.
'a' ran for 30ms
'a' ran for 10ms
'a' ran for 20ms
'b' started.
'b' ran for 75ms
'b' ran for 10ms
'b' ran for 15ms
'b' ran for 350ms
'a' finished.
Giống như ví dụ trước đó của chúng ta, race
vẫn kết thúc ngay khi a
hoàn
thành. Không có sự xen kẽ giữa hai future, mặc dù. Future a
thực hiện tất cả
công việc của nó cho đến khi lệnh gọi trpl::sleep
được await, sau đó future
b
thực hiện tất cả công việc của nó cho đến khi lệnh gọi trpl::sleep
của
riêng nó được await, và cuối cùng future a
hoàn thành. Để cho phép cả hai
future thực hiện tiến trình giữa các tác vụ chậm của chúng, chúng ta cần các
điểm await để chúng ta có thể bàn giao quyền kiểm soát cho runtime. Điều đó có
nghĩa là chúng ta cần thứ gì đó mà chúng ta có thể await!
Chúng ta đã có thể thấy loại bàn giao này xảy ra trong Listing 17-23: nếu chúng
ta loại bỏ trpl::sleep
ở cuối future a
, nó sẽ hoàn thành mà không có future
b
chạy chút nào. Hãy thử sử dụng hàm sleep
làm điểm khởi đầu để cho phép
các hoạt động luân phiên thực hiện tiến trình, như được hiển thị trong Listing
17-24.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let one_ms = Duration::from_millis(1); let a = async { println!("'a' started."); slow("a", 30); trpl::sleep(one_ms).await; slow("a", 10); trpl::sleep(one_ms).await; slow("a", 20); trpl::sleep(one_ms).await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::sleep(one_ms).await; slow("b", 10); trpl::sleep(one_ms).await; slow("b", 15); trpl::sleep(one_ms).await; slow("b", 350); trpl::sleep(one_ms).await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Trong Listing 17-24, chúng ta thêm các lệnh gọi trpl::sleep
với các điểm await
giữa mỗi lệnh gọi đến slow
. Bây giờ công việc của hai future được xen kẽ:
'a' started.
'a' ran for 30ms
'b' started.
'b' ran for 75ms
'a' ran for 10ms
'b' ran for 10ms
'a' ran for 20ms
'b' ran for 15ms
'a' finished.
Future a
vẫn chạy một chút trước khi bàn giao quyền kiểm soát cho b
, bởi vì
nó gọi slow
trước khi từng gọi trpl::sleep
, nhưng sau đó các future hoán đổi
qua lại mỗi khi một trong số chúng đạt đến một điểm await. Trong trường hợp này,
chúng ta đã làm điều đó sau mỗi lệnh gọi đến slow
, nhưng chúng ta có thể chia
nhỏ công việc theo bất kỳ cách nào hợp lý nhất đối với chúng ta.
Tuy nhiên, chúng ta không thực sự muốn ngủ ở đây: chúng ta muốn thực hiện tiến
trình nhanh nhất có thể. Chúng ta chỉ cần trả lại quyền kiểm soát cho runtime.
Chúng ta có thể làm điều đó trực tiếp, sử dụng hàm yield_now
. Trong Listing
17-25, chúng ta thay thế tất cả những lệnh gọi sleep
bằng yield_now
.
extern crate trpl; // required for mdbook test use std::{thread, time::Duration}; fn main() { trpl::run(async { let a = async { println!("'a' started."); slow("a", 30); trpl::yield_now().await; slow("a", 10); trpl::yield_now().await; slow("a", 20); trpl::yield_now().await; println!("'a' finished."); }; let b = async { println!("'b' started."); slow("b", 75); trpl::yield_now().await; slow("b", 10); trpl::yield_now().await; slow("b", 15); trpl::yield_now().await; slow("b", 350); trpl::yield_now().await; println!("'b' finished."); }; trpl::race(a, b).await; }); } fn slow(name: &str, ms: u64) { thread::sleep(Duration::from_millis(ms)); println!("'{name}' ran for {ms}ms"); }
Mã này vừa rõ ràng hơn về ý định thực tế vừa có thể nhanh hơn đáng kể so với
việc sử dụng sleep
, bởi vì các bộ đếm thời gian như bộ được sử dụng bởi
sleep
thường có giới hạn về mức độ chi tiết mà chúng có thể có. Phiên bản
sleep
mà chúng ta đang sử dụng, ví dụ, sẽ luôn ngủ ít nhất một mili giây, ngay
cả khi chúng ta truyền cho nó một Duration
một nano giây. Một lần nữa, máy
tính hiện đại nhanh: chúng có thể làm rất nhiều trong một mili giây!
Bạn có thể tự mình thấy điều này bằng cách thiết lập một benchmark nhỏ, chẳng hạn như bench trong Listing 17-26. (Đây không phải là một cách đặc biệt nghiêm ngặt để thực hiện kiểm tra hiệu suất, nhưng nó đủ để hiển thị sự khác biệt ở đây.)
extern crate trpl; // required for mdbook test use std::time::{Duration, Instant}; fn main() { trpl::run(async { let one_ns = Duration::from_nanos(1); let start = Instant::now(); async { for _ in 1..1000 { trpl::sleep(one_ns).await; } } .await; let time = Instant::now() - start; println!( "'sleep' version finished after {} seconds.", time.as_secs_f32() ); let start = Instant::now(); async { for _ in 1..1000 { trpl::yield_now().await; } } .await; let time = Instant::now() - start; println!( "'yield' version finished after {} seconds.", time.as_secs_f32() ); }); }
Ở đây, chúng ta bỏ qua tất cả việc in trạng thái, truyền một Duration
một nano
giây cho trpl::sleep
, và để mỗi future tự chạy, không có sự chuyển đổi giữa
các future. Sau đó, chúng ta chạy 1.000 lần lặp lại và xem future sử dụng
trpl::sleep
mất bao lâu so với future sử dụng trpl::yield_now
.
Phiên bản với yield_now
nhanh hơn nhiều!
Điều này có nghĩa là async có thể hữu ích ngay cả cho các tác vụ gắn với tính toán, tùy thuộc vào những gì chương trình của bạn đang làm, bởi vì nó cung cấp một công cụ hữu ích để cấu trúc các mối quan hệ giữa các phần khác nhau của chương trình. Đây là một hình thức đa nhiệm hợp tác, trong đó mỗi future có quyền xác định khi nào nó bàn giao quyền kiểm soát thông qua các điểm await. Mỗi future do đó cũng có trách nhiệm tránh chặn quá lâu. Trong một số hệ điều hành nhúng dựa trên Rust, đây là loại duy nhất của đa nhiệm!
Trong mã thực tế, bạn thường sẽ không thay đổi các lệnh gọi hàm với các điểm await trên mỗi dòng, tất nhiên. Mặc dù nhường quyền kiểm soát theo cách này là tương đối không tốn kém, nhưng nó không miễn phí. Trong nhiều trường hợp, việc cố gắng chia nhỏ một tác vụ gắn với tính toán có thể làm cho nó chậm hơn đáng kể, vì vậy đôi khi tốt hơn cho hiệu suất tổng thể là để một hoạt động chặn ngắn. Luôn đo lường để xem thực sự các nút thắt cổ chai hiệu suất của mã của bạn là gì. Động lực cơ bản là điều quan trọng cần ghi nhớ, tuy nhiên, nếu bạn đang thấy nhiều công việc xảy ra tuần tự mà bạn mong đợi xảy ra đồng thời!
Xây Dựng Các Sự Trừu Tượng Async Của Riêng Chúng Ta
Chúng ta cũng có thể kết hợp các future lại với nhau để tạo ra các mẫu mới. Ví
dụ, chúng ta có thể xây dựng một hàm timeout
với các khối xây dựng async mà
chúng ta đã có. Khi chúng ta hoàn thành, kết quả sẽ là một khối xây dựng khác mà
chúng ta có thể sử dụng để tạo ra thêm các sự trừu tượng async.
Listing 17-27 hiển thị cách chúng ta mong đợi timeout
này hoạt động với một
future chậm.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_millis(100)).await;
"I finished!"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
Hãy triển khai điều này! Để bắt đầu, hãy suy nghĩ về API cho timeout
:
- Nó cần phải là một hàm async để chúng ta có thể await nó.
- Tham số đầu tiên của nó phải là một future để chạy. Chúng ta có thể làm cho nó generic để cho phép nó hoạt động với bất kỳ future nào.
- Tham số thứ hai của nó sẽ là thời gian tối đa để đợi. Nếu chúng ta sử dụng một
Duration
, điều đó sẽ làm cho nó dễ dàng để truyền chotrpl::sleep
. - Nó sẽ trả về một
Result
. Nếu future hoàn thành thành công,Result
sẽ làOk
với giá trị được tạo ra bởi future. Nếu timeout hết hạn trước,Result
sẽ làErr
với duration mà timeout đã đợi.
Listing 17-28 hiển thị khai báo này.
extern crate trpl; // required for mdbook test
use std::time::Duration;
fn main() {
trpl::run(async {
let slow = async {
trpl::sleep(Duration::from_secs(5)).await;
"Finally finished"
};
match timeout(slow, Duration::from_millis(10)).await {
Ok(message) => println!("Succeeded with '{message}'"),
Err(duration) => {
println!("Failed after {} seconds", duration.as_secs())
}
}
});
}
async fn timeout<F: Future>(
future_to_try: F,
max_time: Duration,
) -> Result<F::Output, Duration> {
// Here is where our implementation will go!
}
Điều đó đáp ứng các mục tiêu của chúng ta cho các kiểu. Bây giờ hãy suy nghĩ về
hành vi mà chúng ta cần: chúng ta muốn đua future được truyền vào với
duration. Chúng ta có thể sử dụng trpl::sleep
để tạo một future bộ đếm thời
gian từ duration, và sử dụng trpl::race
để chạy bộ đếm thời gian đó với future
mà người gọi truyền vào.
Chúng ta cũng biết rằng race
không công bằng, polling các đối số theo thứ tự
mà chúng được truyền. Do đó, chúng ta truyền future_to_try
cho race
trước để
nó có cơ hội hoàn thành ngay cả khi max_time
là một duration rất ngắn. Nếu
future_to_try
hoàn thành trước, race
sẽ trả về Left
với đầu ra từ
future_to_try
. Nếu timer
hoàn thành trước, race
sẽ trả về Right
với đầu
ra của timer là ()
.
Trong Listing 17-29, chúng ta match trên kết quả của việc await trpl::race
.
extern crate trpl; // required for mdbook test use std::time::Duration; use trpl::Either; // --snip-- fn main() { trpl::run(async { let slow = async { trpl::sleep(Duration::from_secs(5)).await; "Finally finished" }; match timeout(slow, Duration::from_secs(2)).await { Ok(message) => println!("Succeeded with '{message}'"), Err(duration) => { println!("Failed after {} seconds", duration.as_secs()) } } }); } async fn timeout<F: Future>( future_to_try: F, max_time: Duration, ) -> Result<F::Output, Duration> { match trpl::race(future_to_try, trpl::sleep(max_time)).await { Either::Left(output) => Ok(output), Either::Right(_) => Err(max_time), } }
Nếu future_to_try
thành công và chúng ta nhận được Left(output)
, chúng ta
trả về Ok(output)
. Nếu bộ đếm thời gian ngủ hết hạn thay vào đó và chúng ta
nhận được Right(())
, chúng ta bỏ qua ()
với _
và trả về Err(max_time)
thay thế.
Với điều đó, chúng ta có một timeout
hoạt động được xây dựng từ hai trợ giúp
async khác. Nếu chúng ta chạy mã của mình, nó sẽ in chế độ thất bại sau timeout:
Failed after 2 seconds
Bởi vì future kết hợp với các future khác, bạn có thể xây dựng các công cụ thực sự mạnh mẽ sử dụng các khối xây dựng async nhỏ hơn. Ví dụ, bạn có thể sử dụng cùng cách tiếp cận này để kết hợp timeout với thử lại, và đến lượt sử dụng chúng với các hoạt động như các cuộc gọi mạng (một trong các ví dụ từ đầu chương).
Trong thực tế, bạn thường sẽ làm việc trực tiếp với async
và await
, và thứ
hai là với các hàm và macro như join
, join_all
, race
, và v.v. Bạn sẽ chỉ
cần sử dụng pin
thỉnh thoảng để sử dụng future với những API đó.
Bây giờ chúng ta đã thấy một số cách để làm việc với nhiều future cùng một lúc. Tiếp theo, chúng ta sẽ xem cách làm việc với nhiều future trong một chuỗi theo thời gian với streams. Dưới đây là một vài điều khác bạn có thể muốn xem xét trước, tuy nhiên:
-
Chúng ta đã sử dụng một
Vec
vớijoin_all
để đợi tất cả các future trong một nhóm hoàn thành. Làm thế nào bạn có thể sử dụng mộtVec
để xử lý một nhóm future tuần tự thay vào đó? Sự đánh đổi của việc làm điều đó là gì? -
Hãy xem kiểu
futures::stream::FuturesUnordered
từ cratefutures
. Sử dụng nó sẽ khác với sử dụng mộtVec
như thế nào? (Đừng lo lắng về việc nó đến từ phầnstream
của crate; nó hoạt động hoàn toàn tốt với bất kỳ tập hợp future nào.)