Keyboard shortcuts

Press or to navigate between chapters

Press S or / to search in the book

Press ? to show this help

Press Esc to hide this help

Áp dụng Đồng thời với Async

Trong phần này, chúng ta sẽ áp dụng async vào một số thách thức đồng thời giống với những gì chúng ta đã giải quyết bằng thread trong chương 16. Vì chúng ta đã thảo luận về nhiều ý tưởng chính ở đó, trong phần này chúng ta sẽ tập trung vào những điểm khác biệt giữa thread và future.

Trong nhiều trường hợp, các API để làm việc với đồng thời sử dụng async rất giống với những API dùng cho thread. Trong các trường hợp khác, chúng lại khá khác biệt. Ngay cả khi các API trông giống nhau giữa thread và async, chúng thường có hành vi khác nhau—và gần như luôn có các đặc điểm hiệu suất khác nhau.

Tạo Task Mới với spawn_task

Thao tác đầu tiên chúng ta đã giải quyết trong Tạo Thread Mới với Spawn là đếm trên hai thread riêng biệt. Hãy làm điều tương tự bằng async. Crate trpl cung cấp một hàm spawn_task trông rất giống với API thread::spawn, và một hàm sleep là phiên bản async của API thread::sleep. Chúng ta có thể sử dụng chúng cùng nhau để triển khai ví dụ đếm, như trong Listing 17-6.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }
    });
}

Làm điểm bắt đầu, chúng ta thiết lập hàm main với trpl::run để hàm cấp cao nhất của chúng ta có thể là async.

Lưu ý: Từ điểm này trở đi trong chương, mọi ví dụ sẽ bao gồm mã bao bọc chính xác giống nhau với trpl::run trong main, vì vậy chúng ta thường bỏ qua nó giống như chúng ta làm với main. Đừng quên bao gồm nó trong mã của bạn!

Sau đó, chúng ta viết hai vòng lặp trong khối đó, mỗi vòng lặp chứa một lệnh gọi trpl::sleep, đợi nửa giây (500 mili giây) trước khi gửi tin nhắn tiếp theo. Chúng ta đặt một vòng lặp trong phần thân của trpl::spawn_task và vòng lặp khác trong vòng lặp for cấp cao nhất. Chúng ta cũng thêm await sau các lệnh gọi sleep.

Mã này hoạt động tương tự như triển khai dựa trên thread—bao gồm cả thực tế là bạn có thể thấy các tin nhắn xuất hiện theo thứ tự khác nhau trong terminal của bạn khi bạn chạy nó:

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!

Phiên bản này dừng ngay khi vòng lặp for trong phần thân của khối async chính kết thúc, vì task được tạo bởi spawn_task bị tắt khi hàm main kết thúc. Nếu bạn muốn nó chạy hết đến khi task hoàn thành, bạn sẽ cần sử dụng một join handle để đợi task đầu tiên hoàn thành. Với thread, chúng ta đã sử dụng phương thức join để "chặn" cho đến khi thread hoàn tất chạy. Trong Listing 17-7, chúng ta có thể sử dụng await để làm điều tương tự, bởi vì task handle chính nó là một future. Kiểu Output của nó là một Result, vì vậy chúng ta cũng unwrap nó sau khi await nó.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let handle = trpl::spawn_task(async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        });

        for i in 1..5 {
            println!("hi number {i} from the second task!");
            trpl::sleep(Duration::from_millis(500)).await;
        }

        handle.await.unwrap();
    });
}

Phiên bản cập nhật này chạy cho đến khi cả hai vòng lặp kết thúc.

hi number 1 from the second task!
hi number 1 from the first task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Cho đến nay, có vẻ như async và thread cho chúng ta cùng một kết quả cơ bản, chỉ với cú pháp khác nhau: sử dụng await thay vì gọi join trên join handle, và await các lệnh gọi sleep.

Sự khác biệt lớn hơn là chúng ta không cần phải tạo một thread hệ điều hành khác để làm điều này. Thực tế, chúng ta thậm chí không cần phải tạo một task ở đây. Bởi vì các khối async được biên dịch thành các future ẩn danh, chúng ta có thể đặt mỗi vòng lặp trong một khối async và có runtime chạy cả hai đến khi hoàn thành bằng cách sử dụng hàm trpl::join.

Trong phần Chờ Tất cả Các Thread Hoàn thành Bằng Cách Sử dụng join Handles, chúng ta đã thấy cách sử dụng phương thức join trên kiểu JoinHandle được trả về khi bạn gọi std::thread::spawn. Hàm trpl::join tương tự, nhưng dành cho future. Khi bạn đưa cho nó hai future, nó tạo ra một future mới duy nhất mà output của nó là một tuple chứa output của mỗi future bạn đã truyền vào khi cả hai đều hoàn thành. Vì vậy, trong Listing 17-8, chúng ta sử dụng trpl::join để đợi cả fut1fut2 hoàn thành. Chúng ta không await fut1fut2 mà thay vào đó là future mới được tạo ra bởi trpl::join. Chúng ta bỏ qua output, vì nó chỉ là một tuple chứa hai giá trị đơn vị.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let fut1 = async {
            for i in 1..10 {
                println!("hi number {i} from the first task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let fut2 = async {
            for i in 1..5 {
                println!("hi number {i} from the second task!");
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        trpl::join(fut1, fut2).await;
    });
}

Khi chúng ta chạy điều này, chúng ta thấy cả hai future chạy đến khi hoàn thành:

hi number 1 from the first task!
hi number 1 from the second task!
hi number 2 from the first task!
hi number 2 from the second task!
hi number 3 from the first task!
hi number 3 from the second task!
hi number 4 from the first task!
hi number 4 from the second task!
hi number 5 from the first task!
hi number 6 from the first task!
hi number 7 from the first task!
hi number 8 from the first task!
hi number 9 from the first task!

Bây giờ, bạn sẽ thấy thứ tự chính xác giống nhau mỗi lần, điều này rất khác với những gì chúng ta đã thấy với thread. Đó là bởi vì hàm trpl::joincông bằng, nghĩa là nó kiểm tra mỗi future thường xuyên như nhau, luân phiên giữa chúng, và không bao giờ để một future chạy trước nếu future khác đã sẵn sàng. Với thread, hệ điều hành quyết định thread nào được kiểm tra và cho phép nó chạy bao lâu. Với async Rust, runtime quyết định task nào cần kiểm tra. (Trong thực tế, chi tiết trở nên phức tạp vì một runtime async có thể sử dụng thread hệ điều hành bên dưới như một phần của cách nó quản lý đồng thời, vì vậy việc đảm bảo tính công bằng có thể là nhiều việc hơn cho một runtime—nhưng nó vẫn có thể thực hiện được!) Các runtime không phải đảm bảo sự công bằng cho bất kỳ thao tác nào, và chúng thường cung cấp các API khác nhau để cho phép bạn chọn liệu bạn có muốn sự công bằng hay không.

Hãy thử một số biến thể này khi await các future và xem chúng làm gì:

  • Xóa bỏ khối async xung quanh một trong hai hoặc cả hai vòng lặp.
  • Await mỗi khối async ngay sau khi định nghĩa nó.
  • Chỉ bọc vòng lặp đầu tiên trong một khối async, và await future kết quả sau phần thân của vòng lặp thứ hai.

Để thử thách hơn, hãy xem liệu bạn có thể tìm ra output sẽ như thế nào trong mỗi trường hợp trước khi chạy mã!

Đếm trên Hai Task Sử dụng Truyền Thông Điệp

Chia sẻ dữ liệu giữa các future cũng sẽ quen thuộc: chúng ta sẽ sử dụng truyền thông điệp một lần nữa, nhưng lần này với các phiên bản async của các kiểu và hàm. Chúng ta sẽ đi theo một con đường hơi khác với những gì chúng ta đã làm trong Sử dụng Truyền Thông Điệp để Chuyển Dữ liệu Giữa Các Thread để minh họa một số điểm khác biệt chính giữa đồng thời dựa trên thread và dựa trên future. Trong Listing 17-9, chúng ta sẽ bắt đầu với chỉ một khối async duy nhất—không tạo một task riêng biệt như chúng ta đã tạo một thread riêng biệt.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let val = String::from("hi");
        tx.send(val).unwrap();

        let received = rx.recv().await.unwrap();
        println!("received '{received}'");
    });
}

Ở đây, chúng ta sử dụng trpl::channel, một phiên bản async của API kênh đa-nhà sản xuất, đơn-người tiêu dùng mà chúng ta đã sử dụng với thread trong Chương 16. Phiên bản async của API chỉ hơi khác so với phiên bản dựa trên thread: nó sử dụng một receiver rx có thể thay đổi thay vì bất biến, và phương thức recv của nó tạo ra một future mà chúng ta cần await thay vì trực tiếp tạo ra giá trị. Bây giờ chúng ta có thể gửi tin nhắn từ sender đến receiver. Lưu ý rằng chúng ta không phải tạo một thread riêng biệt hoặc thậm chí một task; chúng ta chỉ cần await lệnh gọi rx.recv.

Phương thức đồng bộ Receiver::recv trong std::mpsc::channel chặn cho đến khi nó nhận được tin nhắn. Phương thức trpl::Receiver::recv không làm vậy, vì nó là async. Thay vì chặn, nó giao quyền kiểm soát lại cho runtime cho đến khi hoặc một tin nhắn được nhận hoặc phía gửi của kênh đóng lại. Ngược lại, chúng ta không await lệnh gọi send, vì nó không chặn. Nó không cần phải làm vậy, vì kênh mà chúng ta đang gửi vào là không giới hạn.

Lưu ý: Vì tất cả mã async này chạy trong một khối async trong một lệnh gọi trpl::run, mọi thứ trong đó có thể tránh việc chặn. Tuy nhiên, mã bên ngoài nó sẽ chặn khi hàm run trả về. Đó là toàn bộ mục đích của hàm trpl::run: nó cho phép bạn chọn nơi để chặn đối với một số mã async, và do đó là nơi để chuyển đổi giữa mã đồng bộ và async. Trong hầu hết các runtime async, run thực sự được đặt tên là block_on chính vì lý do này.

Lưu ý hai điều về ví dụ này. Đầu tiên, tin nhắn sẽ đến ngay lập tức. Thứ hai, mặc dù chúng ta sử dụng một future ở đây, nhưng chưa có sự đồng thời. Mọi thứ trong listing xảy ra tuần tự, giống như khi không có future nào tham gia.

Hãy giải quyết phần đầu tiên bằng cách gửi một loạt tin nhắn và ngủ giữa chúng, như trong Listing 17-10.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let vals = vec![
            String::from("hi"),
            String::from("from"),
            String::from("the"),
            String::from("future"),
        ];

        for val in vals {
            tx.send(val).unwrap();
            trpl::sleep(Duration::from_millis(500)).await;
        }

        while let Some(value) = rx.recv().await {
            println!("received '{value}'");
        }
    });
}

Ngoài việc gửi tin nhắn, chúng ta cần phải nhận chúng. Trong trường hợp này, vì chúng ta biết có bao nhiêu tin nhắn đến, chúng ta có thể làm điều đó thủ công bằng cách gọi rx.recv().await bốn lần. Tuy nhiên, trong thế giới thực, chúng ta thường đang đợi một số lượng tin nhắn không xác định, vì vậy chúng ta cần tiếp tục đợi cho đến khi chúng ta xác định rằng không còn tin nhắn nào nữa.

Trong Listing 16-10, chúng ta đã sử dụng một vòng lặp for để xử lý tất cả các mục nhận được từ một kênh đồng bộ. Tuy nhiên, Rust chưa có cách để viết một vòng lặp for trên một loạt các mục bất đồng bộ, vì vậy chúng ta cần sử dụng một vòng lặp mà chúng ta chưa thấy trước đây: vòng lặp điều kiện while let. Đây là phiên bản vòng lặp của cấu trúc if let mà chúng ta đã thấy trong phần Kiểm soát Luồng Ngắn gọn với if letlet else. Vòng lặp sẽ tiếp tục thực thi miễn là mẫu mà nó chỉ định tiếp tục khớp với giá trị.

Lệnh gọi rx.recv tạo ra một future, mà chúng ta await. Runtime sẽ tạm dừng future cho đến khi nó sẵn sàng. Khi một tin nhắn đến, future sẽ giải quyết thành Some(message) nhiều lần khi tin nhắn đến. Khi kênh đóng lại, bất kể bất kỳ tin nhắn nào đã đến, future sẽ thay vào đó giải quyết thành None để chỉ ra rằng không còn giá trị nào nữa và do đó chúng ta nên dừng polling—nghĩa là, dừng awaiting.

Vòng lặp while let kết hợp tất cả những điều này. Nếu kết quả của việc gọi rx.recv().awaitSome(message), chúng ta có quyền truy cập vào tin nhắn và chúng ta có thể sử dụng nó trong phần thân vòng lặp, giống như chúng ta có thể làm với if let. Nếu kết quả là None, vòng lặp kết thúc. Mỗi khi vòng lặp hoàn thành, nó chạm vào điểm await một lần nữa, vì vậy runtime tạm dừng nó một lần nữa cho đến khi một tin nhắn khác đến.

Mã bây giờ thành công gửi và nhận tất cả các tin nhắn. Đáng tiếc, vẫn còn một vài vấn đề. Một là, các tin nhắn không đến ở các khoảng thời gian nửa giây. Chúng đến cùng một lúc, 2 giây (2.000 mili giây) sau khi chúng ta bắt đầu chương trình. Vấn đề khác là chương trình này không bao giờ thoát! Thay vào đó, nó đợi mãi mãi cho các tin nhắn mới. Bạn sẽ cần phải tắt nó bằng ctrl-c.

Hãy bắt đầu bằng cách kiểm tra lý do tại sao các tin nhắn đến cùng một lúc sau khi trì hoãn đầy đủ, thay vì đến với độ trễ giữa mỗi tin. Trong một khối async nhất định, thứ tự mà các từ khóa await xuất hiện trong mã cũng là thứ tự mà chúng được thực thi khi chương trình chạy.

Chỉ có một khối async trong Listing 17-10, vì vậy mọi thứ trong đó chạy tuyến tính. Vẫn chưa có sự đồng thời. Tất cả các lệnh gọi tx.send xảy ra, xen kẽ với tất cả các lệnh gọi trpl::sleep và các điểm await liên quan. Chỉ sau đó vòng lặp while let mới đi qua bất kỳ điểm await nào trong các lệnh gọi recv.

Để có được hành vi mà chúng ta muốn, nơi độ trễ sleep xảy ra giữa mỗi tin nhắn, chúng ta cần đặt các thao tác txrx trong các khối async riêng của chúng, như trong Listing 17-11. Sau đó, runtime có thể thực thi từng khối riêng biệt bằng cách sử dụng trpl::join, giống như trong ví dụ đếm. Một lần nữa, chúng ta await kết quả của việc gọi trpl::join, không phải các future riêng lẻ. Nếu chúng ta await các future riêng lẻ một cách tuần tự, chúng ta sẽ chỉ quay lại một luồng tuần tự—chính xác những gì chúng ta đang cố gắng không làm.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Với mã đã cập nhật trong Listing 17-11, các tin nhắn được in ở khoảng thời gian 500-mili giây, thay vì tất cả cùng một lúc sau 2 giây.

Tuy nhiên, chương trình vẫn không bao giờ thoát, vì cách mà vòng lặp while let tương tác với trpl::join:

  • Future được trả về từ trpl::join chỉ hoàn thành khi cả hai future được truyền vào nó đã hoàn thành.
  • Future tx hoàn thành khi nó hoàn tất việc ngủ sau khi gửi tin nhắn cuối cùng trong vals.
  • Future rx sẽ không hoàn thành cho đến khi vòng lặp while let kết thúc.
  • Vòng lặp while let sẽ không kết thúc cho đến khi await rx.recv tạo ra None.
  • Await rx.recv sẽ chỉ trả về None khi đầu kia của kênh được đóng lại.
  • Kênh sẽ chỉ đóng khi chúng ta gọi rx.close hoặc khi phía gửi, tx, bị drop.
  • Chúng ta không gọi rx.close ở bất kỳ đâu, và tx sẽ không bị drop cho đến khi khối async ngoài cùng được truyền vào trpl::run kết thúc.
  • Khối không thể kết thúc vì nó bị chặn trên trpl::join đang hoàn thành, điều này đưa chúng ta trở lại đầu danh sách này.

Chúng ta có thể đóng rx thủ công bằng cách gọi rx.close ở đâu đó, nhưng điều đó không có nhiều ý nghĩa. Việc dừng lại sau khi xử lý một số lượng tin nhắn tùy ý sẽ làm cho chương trình tắt, nhưng chúng ta có thể bỏ lỡ các tin nhắn. Chúng ta cần một cách khác để đảm bảo rằng tx bị drop trước khi kết thúc hàm.

Hiện tại, khối async nơi chúng ta gửi tin nhắn chỉ mượn tx vì gửi tin nhắn không yêu cầu quyền sở hữu, nhưng nếu chúng ta có thể di chuyển tx vào khối async đó, nó sẽ bị drop khi khối đó kết thúc. Trong phần Chương 13 Nắm bắt Tham chiếu hoặc Di chuyển Quyền sở hữu, bạn đã học cách sử dụng từ khóa move với closure, và, như đã thảo luận trong phần Chương 16 Sử dụng Closure move với Thread, chúng ta thường cần di chuyển dữ liệu vào closure khi làm việc với thread. Cùng động lực cơ bản áp dụng cho các khối async, vì vậy từ khóa move hoạt động với các khối async giống như với closure.

Trong Listing 17-12, chúng ta thay đổi khối được sử dụng để gửi tin nhắn từ async thành async move. Khi chúng ta chạy phiên bản mã này, nó tắt một cách thanh lịch sau khi tin nhắn cuối cùng được gửi và nhận.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        trpl::join(tx_fut, rx_fut).await;
    });
}

Kênh async này cũng là một kênh đa-nhà sản xuất, vì vậy chúng ta có thể gọi clone trên tx nếu chúng ta muốn gửi tin nhắn từ nhiều future, như trong Listing 17-13.

extern crate trpl; // required for mdbook test

use std::time::Duration;

fn main() {
    trpl::run(async {
        let (tx, mut rx) = trpl::channel();

        let tx1 = tx.clone();
        let tx1_fut = async move {
            let vals = vec![
                String::from("hi"),
                String::from("from"),
                String::from("the"),
                String::from("future"),
            ];

            for val in vals {
                tx1.send(val).unwrap();
                trpl::sleep(Duration::from_millis(500)).await;
            }
        };

        let rx_fut = async {
            while let Some(value) = rx.recv().await {
                println!("received '{value}'");
            }
        };

        let tx_fut = async move {
            let vals = vec![
                String::from("more"),
                String::from("messages"),
                String::from("for"),
                String::from("you"),
            ];

            for val in vals {
                tx.send(val).unwrap();
                trpl::sleep(Duration::from_millis(1500)).await;
            }
        };

        trpl::join3(tx1_fut, tx_fut, rx_fut).await;
    });
}

Đầu tiên, chúng ta sao chép tx, tạo ra tx1 bên ngoài khối async đầu tiên. Chúng ta di chuyển tx1 vào khối đó giống như chúng ta đã làm trước đây với tx. Sau đó, sau này, chúng ta di chuyển tx ban đầu vào một khối async mới, nơi chúng ta gửi thêm tin nhắn với độ trễ hơi chậm hơn. Chúng ta tình cờ đặt khối async mới này sau khối async để nhận tin nhắn, nhưng nó cũng có thể đi trước nó. Chìa khóa là thứ tự mà các future được await, không phải thứ tự chúng được tạo ra.

Cả hai khối async để gửi tin nhắn cần phải là các khối async move để cả txtx1 đều bị drop khi các khối đó kết thúc. Nếu không, chúng ta sẽ quay trở lại vòng lặp vô hạn mà chúng ta đã bắt đầu. Cuối cùng, chúng ta chuyển từ trpl::join sang trpl::join3 để xử lý future bổ sung.

Bây giờ chúng ta thấy tất cả các tin nhắn từ cả hai future gửi, và vì các future gửi sử dụng các độ trễ hơi khác nhau sau khi gửi, các tin nhắn cũng được nhận ở những khoảng thời gian khác nhau đó.

received 'hi'
received 'more'
received 'from'
received 'the'
received 'messages'
received 'future'
received 'for'
received 'you'

Đây là một khởi đầu tốt, nhưng nó giới hạn chúng ta ở chỉ một vài future: hai với join, hoặc ba với join3. Hãy xem chúng ta có thể làm việc với nhiều future hơn như thế nào.