Streams: Future theo Trình tự

Cho đến nay trong chương này, chúng ta chủ yếu đã làm việc với các future riêng lẻ. Một ngoại lệ lớn là kênh async mà chúng ta đã sử dụng. Hãy nhớ lại cách chúng ta đã sử dụng bộ thu cho kênh async của chúng ta trước đó trong chương này ở phần "Message Passing". Phương thức recv async tạo ra một chuỗi các mục theo thời gian. Đây là một ví dụ của một mẫu tổng quát hơn được gọi là stream.

Chúng ta đã thấy một chuỗi các mục trong Chương 13, khi chúng ta xem xét trait Iterator trong phần The Iterator Trait and the next Method, nhưng có hai sự khác biệt giữa iterators và bộ thu kênh async. Sự khác biệt đầu tiên là thời gian: iterators là đồng bộ, trong khi bộ thu kênh là bất đồng bộ. Thứ hai là API. Khi làm việc trực tiếp với Iterator, chúng ta gọi phương thức next đồng bộ của nó. Với stream trpl::Receiver cụ thể, chúng ta đã gọi một phương thức recv bất đồng bộ thay thế. Ngoài ra, các API này cảm thấy rất giống nhau, và sự tương đồng đó không phải là sự trùng hợp. Một stream giống như một hình thức lặp bất đồng bộ. Trong khi trpl::Receiver cụ thể đợi để nhận tin nhắn, thì API stream đa năng chung rộng hơn nhiều: nó cung cấp mục tiếp theo theo cách Iterator làm, nhưng bất đồng bộ.

Sự tương đồng giữa iterators và streams trong Rust có nghĩa là chúng ta thực sự có thể tạo một stream từ bất kỳ iterator nào. Giống như với một iterator, chúng ta có thể làm việc với một stream bằng cách gọi phương thức next của nó và sau đó đợi kết quả, như trong Listing 17-30.

extern crate trpl; // required for mdbook test

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

Chúng ta bắt đầu với một mảng các số, mà chúng ta chuyển đổi thành một iterator và sau đó gọi map trên nó để nhân đôi tất cả các giá trị. Sau đó, chúng ta chuyển đổi iterator thành stream bằng cách sử dụng hàm trpl::stream_from_iter. Tiếp theo, chúng ta lặp qua các mục trong stream khi chúng xuất hiện với vòng lặp while let.

Thật không may, khi chúng ta thử chạy mã, nó không biên dịch được, mà thay vào đó báo cáo rằng không có phương thức next nào khả dụng:

error[E0599]: no method named `next` found for struct `Iter` in the current scope
  --> src/main.rs:10:40
   |
10 |         while let Some(value) = stream.next().await {
   |                                        ^^^^
   |
   = note: the full type name has been written to 'file:///projects/async-await/target/debug/deps/async_await-575db3dd3197d257.long-type-14490787947592691573.txt'
   = note: consider using `--verbose` to print the full type name to the console
   = help: items from traits can only be used if the trait is in scope
help: the following traits which provide `next` are implemented but not in scope; perhaps you want to import one of them
   |
1  + use crate::trpl::StreamExt;
   |
1  + use futures_util::stream::stream::StreamExt;
   |
1  + use std::iter::Iterator;
   |
1  + use std::str::pattern::Searcher;
   |
help: there is a method `try_next` with a similar name
   |
10 |         while let Some(value) = stream.try_next().await {
   |                                        ~~~~~~~~

Như kết quả này giải thích, lý do cho lỗi biên dịch là chúng ta cần trait đúng trong phạm vi để có thể sử dụng phương thức next. Dựa trên cuộc thảo luận của chúng ta cho đến nay, bạn có thể mong đợi một cách hợp lý rằng trait đó là Stream, nhưng nó thực sự là StreamExt. Viết tắt của extension, Ext là một mẫu phổ biến trong cộng đồng Rust để mở rộng một trait với một trait khác.

Chúng ta sẽ giải thích các trait StreamStreamExt chi tiết hơn một chút vào cuối chương, nhưng hiện tại tất cả những gì bạn cần biết là trait Stream định nghĩa một giao diện cấp thấp mà hiệu quả là kết hợp các trait IteratorFuture. StreamExt cung cấp một tập hợp API cấp cao hơn trên Stream, bao gồm phương thức next cũng như các phương thức tiện ích khác tương tự như những phương thức được cung cấp bởi trait Iterator. StreamStreamExt chưa phải là một phần của thư viện chuẩn của Rust, nhưng hầu hết các crate trong hệ sinh thái sử dụng cùng định nghĩa.

Cách sửa lỗi biên dịch là thêm câu lệnh use cho trpl::StreamExt, như trong Listing 17-31.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
        let iter = values.iter().map(|n| n * 2);
        let mut stream = trpl::stream_from_iter(iter);

        while let Some(value) = stream.next().await {
            println!("The value was: {value}");
        }
    });
}

Với tất cả các phần đó được kết hợp lại, mã này hoạt động theo cách chúng ta muốn! Hơn nữa, bây giờ chúng ta có StreamExt trong phạm vi, chúng ta có thể sử dụng tất cả các phương thức tiện ích của nó, giống như với các iterator. Ví dụ, trong Listing 17-32, chúng ta sử dụng phương thức filter để lọc ra tất cả trừ bội số của ba và năm.

extern crate trpl; // required for mdbook test

use trpl::StreamExt;

fn main() {
    trpl::run(async {
        let values = 1..101;
        let iter = values.map(|n| n * 2);
        let stream = trpl::stream_from_iter(iter);

        let mut filtered =
            stream.filter(|value| value % 3 == 0 || value % 5 == 0);

        while let Some(value) = filtered.next().await {
            println!("The value was: {value}");
        }
    });
}

Tất nhiên, điều này không quá thú vị, vì chúng ta có thể làm tương tự với các iterator thông thường và không cần async gì cả. Hãy xem những gì chúng ta có thể làm mà độc đáo cho streams.

Kết hợp các Streams

Nhiều khái niệm tự nhiên được biểu diễn dưới dạng streams: các mục trở nên có sẵn trong một hàng đợi, các đoạn dữ liệu được kéo dần từ hệ thống tập tin khi tập dữ liệu đầy đủ quá lớn đối với bộ nhớ của máy tính, hoặc dữ liệu đến qua mạng theo thời gian. Vì streams là futures, chúng ta có thể sử dụng chúng với bất kỳ loại future nào khác và kết hợp chúng theo những cách thú vị. Ví dụ, chúng ta có thể gộp các sự kiện để tránh kích hoạt quá nhiều cuộc gọi mạng, đặt thời gian chờ trên chuỗi các hoạt động chạy dài, hoặc hạn chế các sự kiện giao diện người dùng để tránh làm công việc không cần thiết.

Hãy bắt đầu bằng việc xây dựng một stream nhỏ gồm các tin nhắn như một thay thế cho stream dữ liệu mà chúng ta có thể thấy từ WebSocket hoặc giao thức giao tiếp thời gian thực khác, như được hiển thị trong Listing 17-33.

extern crate trpl; // required for mdbook test

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages = get_messages();

        while let Some(message) = messages.next().await {
            println!("{message}");
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Đầu tiên, chúng ta tạo một hàm gọi là get_messages mà trả về impl Stream<Item = String>. Đối với cách thực hiện của nó, chúng ta tạo một kênh async, lặp qua 10 chữ cái đầu tiên của bảng chữ cái tiếng Anh, và gửi chúng qua kênh.

Chúng ta cũng sử dụng một kiểu mới: ReceiverStream, nó chuyển đổi bộ thu rx từ trpl::channel thành một Stream với một phương thức next. Trở lại trong main, chúng ta sử dụng một vòng lặp while let để in tất cả các tin nhắn từ stream.

Khi chúng ta chạy mã này, chúng ta nhận được chính xác kết quả mà chúng ta mong đợi:

Message: 'a'
Message: 'b'
Message: 'c'
Message: 'd'
Message: 'e'
Message: 'f'
Message: 'g'
Message: 'h'
Message: 'i'
Message: 'j'

Một lần nữa, chúng ta có thể làm điều này với API Receiver thông thường hoặc thậm chí API Iterator thông thường, mặc dù vậy, nên hãy thêm một tính năng yêu cầu streams: thêm một timeout áp dụng cho mọi mục trong stream, và một độ trễ trên các mục mà chúng ta phát ra, như trong Listing 17-34.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};
use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
    for message in messages {
        tx.send(format!("Message: '{message}'")).unwrap();
    }

    ReceiverStream::new(rx)
}

Chúng ta bắt đầu bằng việc thêm một timeout vào stream với phương thức timeout, nó đến từ trait StreamExt. Sau đó, chúng ta cập nhật thân của vòng lặp while let, bởi vì stream bây giờ trả về một Result. Biến thể Ok chỉ ra rằng một tin nhắn đã đến kịp thời; biến thể Err chỉ ra rằng timeout đã hết hạn trước khi bất kỳ tin nhắn nào đến. Chúng ta match trên kết quả đó và hoặc in tin nhắn khi chúng ta nhận được nó thành công hoặc in một thông báo về timeout. Cuối cùng, chú ý rằng chúng ta ghim các tin nhắn sau khi áp dụng timeout cho chúng, bởi vì trợ giúp timeout tạo ra một stream cần được ghim để được poll.

Tuy nhiên, bởi vì không có độ trễ giữa các tin nhắn, timeout này không thay đổi hành vi của chương trình. Hãy thêm một độ trễ thay đổi cho các tin nhắn mà chúng ta gửi, như trong Listing 17-35.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Trong get_messages, chúng ta sử dụng phương thức iterator enumerate với mảng messages để chúng ta có thể lấy chỉ số của mỗi mục mà chúng ta đang gửi cùng với chính mục đó. Sau đó, chúng ta áp dụng một độ trễ 100 mili giây cho các mục có chỉ số chẵn và một độ trễ 300 mili giây cho các mục có chỉ số lẻ để mô phỏng các độ trễ khác nhau mà chúng ta có thể thấy từ một stream các tin nhắn trong thế giới thực. Bởi vì timeout của chúng ta là 200 mili giây, điều này sẽ ảnh hưởng đến một nửa các tin nhắn.

Để ngủ giữa các tin nhắn trong hàm get_messages mà không chặn, chúng ta cần sử dụng async. Tuy nhiên, chúng ta không thể biến get_messages thành một hàm async, bởi vì sau đó chúng ta sẽ trả về một Future<Output = Stream<Item = String>> thay vì một Stream<Item = String>>. Người gọi sẽ phải await chính get_messages để có quyền truy cập vào stream. Nhưng hãy nhớ: mọi thứ trong một future nhất định xảy ra tuyến tính; tính đồng thời xảy ra giữa các future. Awaiting get_messages sẽ yêu cầu nó gửi tất cả các tin nhắn, bao gồm cả độ trễ sleep giữa mỗi tin nhắn, trước khi trả về stream bộ thu. Kết quả là, timeout sẽ vô dụng. Sẽ không có độ trễ trong chính stream; tất cả chúng sẽ xảy ra trước khi stream thậm chí có sẵn.

Thay vào đó, chúng ta để get_messages như một hàm thông thường trả về một stream, và chúng ta spawn một task để xử lý các lệnh gọi sleep async.

Lưu ý: Gọi spawn_task theo cách này hoạt động bởi vì chúng ta đã thiết lập runtime của mình; nếu không, nó sẽ gây ra hoảng loạn. Các cách thực hiện khác lựa chọn các sự đánh đổi khác nhau: chúng có thể spawn một runtime mới và tránh hoảng loạn nhưng cuối cùng có một chút chi phí bổ sung, hoặc chúng có thể đơn giản không cung cấp một cách độc lập để spawn các task mà không cần tham chiếu đến runtime. Hãy đảm bảo rằng bạn biết sự đánh đổi mà runtime của bạn đã chọn và viết mã của bạn tương ứng!

Bây giờ mã của chúng ta có một kết quả thú vị hơn nhiều. Giữa mỗi cặp tin nhắn khác, một lỗi Problem: Elapsed(()).

Message: 'a'
Problem: Elapsed(())
Message: 'b'
Message: 'c'
Problem: Elapsed(())
Message: 'd'
Message: 'e'
Problem: Elapsed(())
Message: 'f'
Message: 'g'
Problem: Elapsed(())
Message: 'h'
Message: 'i'
Problem: Elapsed(())
Message: 'j'

Timeout không ngăn chặn các tin nhắn đến cuối cùng. Chúng ta vẫn nhận được tất cả các tin nhắn ban đầu, bởi vì kênh của chúng ta là unbounded: nó có thể chứa nhiều tin nhắn như chúng ta có thể vừa trong bộ nhớ. Nếu tin nhắn không đến trước khi timeout, bộ xử lý stream của chúng ta sẽ tính đến điều đó, nhưng khi nó poll stream một lần nữa, tin nhắn bây giờ có thể đã đến.

Bạn có thể nhận được hành vi khác nhau nếu cần bằng cách sử dụng các loại kênh khác hoặc các loại stream khác nói chung. Hãy xem một trong những điều đó trong thực tế bằng cách kết hợp một stream các khoảng thời gian với stream tin nhắn này.

Hợp nhất các Streams

Trước tiên, hãy tạo một stream khác, nó sẽ phát ra một mục mỗi mili giây nếu chúng ta để nó chạy trực tiếp. Để đơn giản, chúng ta có thể sử dụng hàm sleep để gửi một tin nhắn trên một độ trễ và kết hợp nó với cùng cách tiếp cận mà chúng ta đã sử dụng trong get_messages của việc tạo một stream từ một kênh. Sự khác biệt là lần này, chúng ta sẽ gửi lại số lượng khoảng thời gian đã trôi qua, vì vậy kiểu trả về sẽ là impl Stream<Item = u32>, và chúng ta có thể gọi hàm get_intervals (xem Listing 17-36).

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let mut messages =
            pin!(get_messages().timeout(Duration::from_millis(200)));

        while let Some(result) = messages.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Chúng ta bắt đầu bằng việc định nghĩa một count trong task. (Chúng ta cũng có thể định nghĩa nó bên ngoài task, nhưng rõ ràng hơn là giới hạn phạm vi của bất kỳ biến nhất định nào.) Sau đó, chúng ta tạo một vòng lặp vô hạn. Mỗi lần lặp của vòng lặp ngủ bất đồng bộ trong một mili giây, tăng số đếm, và sau đó gửi nó qua kênh. Bởi vì tất cả điều này được bọc trong task được tạo bởi spawn_task, tất cả nó—bao gồm cả vòng lặp vô hạn—sẽ được dọn dẹp cùng với runtime.

Loại vòng lặp vô hạn này, nó chỉ kết thúc khi toàn bộ runtime bị tháo dỡ, khá phổ biến trong Rust async: nhiều chương trình cần tiếp tục chạy vô thời hạn. Với async, điều này không chặn bất cứ thứ gì khác, miễn là có ít nhất một điểm await trong mỗi lần lặp qua vòng lặp.

Bây giờ, trở lại async block của hàm main của chúng ta, chúng ta có thể cố gắng hợp nhất các stream messagesintervals, như trong Listing 17-37.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals();
        let merged = messages.merge(intervals);

        while let Some(result) = merged.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Chúng ta bắt đầu bằng cách gọi get_intervals. Sau đó, chúng ta hợp nhất các stream messagesintervals với phương thức merge, nó kết hợp nhiều stream thành một stream mà tạo ra các mục từ bất kỳ stream nguồn nào ngay khi các mục có sẵn, mà không áp đặt bất kỳ thứ tự cụ thể nào. Cuối cùng, chúng ta lặp qua stream kết hợp đó thay vì qua messages.

Tại thời điểm này, không messages cũng không intervals cần được ghim hoặc có thể thay đổi, bởi vì cả hai sẽ được kết hợp vào stream merged duy nhất. Tuy nhiên, lệnh gọi merge này không biên dịch! (Lệnh gọi next trong vòng lặp while let cũng không, nhưng chúng ta sẽ quay lại điều đó.) Đây là bởi vì hai stream có các kiểu khác nhau. Stream messages có kiểu Timeout<impl Stream<Item = String>>, trong đó Timeout là kiểu thực thi Stream cho một lệnh gọi timeout. Stream intervals có kiểu impl Stream<Item = u32>. Để hợp nhất hai stream này, chúng ta cần chuyển đổi một trong chúng để khớp với stream kia. Chúng ta sẽ cải tạo stream intervals, bởi vì messages đã ở định dạng cơ bản mà chúng ta muốn và phải xử lý các lỗi timeout (xem Listing 17-38).

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Đầu tiên, chúng ta có thể sử dụng phương thức trợ giúp map để chuyển đổi intervals thành một chuỗi. Thứ hai, chúng ta cần phải khớp với Timeout từ messages. Bởi vì chúng ta không thực sự muốn một timeout cho intervals, tuy nhiên, chúng ta có thể chỉ tạo một timeout dài hơn các khoảng thời gian khác mà chúng ta đang sử dụng. Ở đây, chúng ta tạo một timeout 10 giây với Duration::from_secs(10). Cuối cùng, chúng ta cần làm cho stream có thể thay đổi, để các lệnh gọi next của vòng lặp while let có thể lặp qua stream, và ghim nó để an toàn khi làm như vậy. Điều đó đưa chúng ta gần đến nơi chúng ta cần phải đến. Mọi thứ kiểm tra kiểu. Tuy nhiên, nếu bạn chạy điều này, sẽ có hai vấn đề. Đầu tiên, nó sẽ không bao giờ dừng lại! Bạn sẽ cần dừng nó với ctrl-c. Thứ hai, các tin nhắn từ bảng chữ cái tiếng Anh sẽ bị chôn vùi giữa tất cả các tin nhắn bộ đếm khoảng thời gian:

--snip--
Interval: 38
Interval: 39
Interval: 40
Message: 'a'
Interval: 41
Interval: 42
Interval: 43
--snip--

Listing 17-39 hiển thị một cách để giải quyết hai vấn đề cuối cùng này.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval: {count}"))
            .throttle(Duration::from_millis(100))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(message) => println!("{message}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    })
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];
        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            tx.send(format!("Message: '{message}'")).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;
            tx.send(count).unwrap();
        }
    });

    ReceiverStream::new(rx)
}

Đầu tiên, chúng ta sử dụng phương thức throttle trên stream intervals để nó không áp đảo stream messages. Throttling là một cách để giới hạn tốc độ mà một hàm sẽ được gọi—hoặc, trong trường hợp này, bao lâu một lần stream sẽ được poll. Mỗi 100 mili giây sẽ đủ, bởi vì đó là khoảng thời gian tin nhắn của chúng ta đến.

Để giới hạn số lượng mục mà chúng ta sẽ chấp nhận từ một stream, chúng ta áp dụng phương thức take cho stream merged, bởi vì chúng ta muốn giới hạn đầu ra cuối cùng, không chỉ một stream này hoặc stream kia.

Bây giờ khi chúng ta chạy chương trình, nó dừng lại sau khi kéo 20 mục từ stream, và các khoảng thời gian không áp đảo các tin nhắn. Chúng ta cũng không nhận được Interval: 100 hoặc Interval: 200 hoặc vân vân, mà thay vào đó nhận được Interval: 1, Interval: 2, và v.v.—mặc dù chúng ta có một stream nguồn có thể tạo ra một sự kiện mỗi mili giây. Đó là bởi vì lệnh gọi throttle tạo ra một stream mới bao bọc stream ban đầu để stream ban đầu chỉ được poll ở tốc độ throttle, không phải tốc độ "tự nhiên" của nó. Chúng ta không có một đống tin nhắn khoảng thời gian không xử lý mà chúng ta đang chọn bỏ qua. Thay vào đó, chúng ta không bao giờ tạo ra những tin nhắn khoảng thời gian đó ngay từ đầu! Đây là tính "lười biếng" vốn có của các future của Rust làm việc một lần nữa, cho phép chúng ta chọn các đặc điểm hiệu suất của mình.

Interval: 1
Message: 'a'
Interval: 2
Interval: 3
Problem: Elapsed(())
Interval: 4
Message: 'b'
Interval: 5
Message: 'c'
Interval: 6
Interval: 7
Problem: Elapsed(())
Interval: 8
Message: 'd'
Interval: 9
Message: 'e'
Interval: 10
Interval: 11
Problem: Elapsed(())
Interval: 12

Có một điều cuối cùng chúng ta cần xử lý: lỗi! Với cả hai stream dựa trên kênh này, các lệnh gọi send có thể thất bại khi phía bên kia của kênh đóng—và đó chỉ là một vấn đề về cách runtime thực thi các future tạo nên stream. Cho đến bây giờ, chúng ta đã bỏ qua khả năng này bằng cách gọi unwrap, nhưng trong một ứng dụng hoạt động tốt, chúng ta nên xử lý lỗi một cách rõ ràng, tối thiểu là bằng cách kết thúc vòng lặp để chúng ta không cố gắng gửi thêm tin nhắn. Listing 17-40 hiển thị một chiến lược lỗi đơn giản: in vấn đề và sau đó break khỏi các vòng lặp.

extern crate trpl; // required for mdbook test

use std::{pin::pin, time::Duration};

use trpl::{ReceiverStream, Stream, StreamExt};

fn main() {
    trpl::run(async {
        let messages = get_messages().timeout(Duration::from_millis(200));
        let intervals = get_intervals()
            .map(|count| format!("Interval #{count}"))
            .throttle(Duration::from_millis(500))
            .timeout(Duration::from_secs(10));
        let merged = messages.merge(intervals).take(20);
        let mut stream = pin!(merged);

        while let Some(result) = stream.next().await {
            match result {
                Ok(item) => println!("{item}"),
                Err(reason) => eprintln!("Problem: {reason:?}"),
            }
        }
    });
}

fn get_messages() -> impl Stream<Item = String> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let messages = ["a", "b", "c", "d", "e", "f", "g", "h", "i", "j"];

        for (index, message) in messages.into_iter().enumerate() {
            let time_to_sleep = if index % 2 == 0 { 100 } else { 300 };
            trpl::sleep(Duration::from_millis(time_to_sleep)).await;

            if let Err(send_error) = tx.send(format!("Message: '{message}'")) {
                eprintln!("Cannot send message '{message}': {send_error}");
                break;
            }
        }
    });

    ReceiverStream::new(rx)
}

fn get_intervals() -> impl Stream<Item = u32> {
    let (tx, rx) = trpl::channel();

    trpl::spawn_task(async move {
        let mut count = 0;
        loop {
            trpl::sleep(Duration::from_millis(1)).await;
            count += 1;

            if let Err(send_error) = tx.send(count) {
                eprintln!("Could not send interval {count}: {send_error}");
                break;
            };
        }
    });

    ReceiverStream::new(rx)
}

Như thường lệ, cách đúng để xử lý lỗi gửi tin nhắn sẽ thay đổi; chỉ cần đảm bảo bạn có một chiến lược.

Bây giờ sau khi chúng ta đã thấy rất nhiều async trong thực tế, hãy lui lại một bước và đào sâu vào một vài chi tiết về cách Future, Stream, và các trait quan trọng khác mà Rust sử dụng để làm cho async hoạt động.