Introduce stream::try_channel helper

This commit is contained in:
Héctor Ramón Jiménez 2024-07-11 04:08:40 +02:00
parent 70f44a6e26
commit 4ce2a207a6
No known key found for this signature in database
GPG key ID: 7CC46565708259A7

View file

@ -23,3 +23,24 @@ where
stream::select(receiver, runner)
}
/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
/// that can fail to the [`mpsc::Sender`] provided to the closure.
pub fn try_channel<T, E, F>(
size: usize,
f: impl FnOnce(mpsc::Sender<T>) -> F,
) -> impl Stream<Item = Result<T, E>>
where
F: Future<Output = Result<(), E>>,
{
let (sender, receiver) = mpsc::channel(size);
let runner = stream::once(f(sender)).filter_map(|result| async {
match result {
Ok(()) => None,
Err(error) => Some(Err(error)),
}
});
stream::select(receiver.map(Ok), runner)
}