Implement Task::collect

This commit is contained in:
Héctor Ramón Jiménez 2024-06-15 00:43:51 +02:00
parent 7f13fab058
commit 43033c7f83
No known key found for this signature in database
GPG key ID: 4C07CEC81AFA161F

View file

@ -47,6 +47,42 @@ impl<T> Task<T> {
Self(Some(boxed_stream(stream.map(Action::Output)))) Self(Some(boxed_stream(stream.map(Action::Output))))
} }
/// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
/// output with the given closure.
pub fn perform<A>(
future: impl Future<Output = A> + MaybeSend + 'static,
f: impl Fn(A) -> T + MaybeSend + 'static,
) -> Self
where
T: MaybeSend + 'static,
A: MaybeSend + 'static,
{
Self::future(future.map(f))
}
/// Creates a [`Task`] that runs the given [`Stream`] to completion and maps each
/// item with the given closure.
pub fn run<A>(
stream: impl Stream<Item = A> + MaybeSend + 'static,
f: impl Fn(A) -> T + MaybeSend + 'static,
) -> Self
where
T: 'static,
{
Self::stream(stream.map(f))
}
/// Combines the given tasks and produces a single [`Task`] that will run all of them
/// in parallel.
pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
where
T: 'static,
{
Self(Some(boxed_stream(stream::select_all(
tasks.into_iter().filter_map(|task| task.0),
))))
}
/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces /// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
/// its output. /// its output.
pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T> pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T>
@ -163,38 +199,34 @@ impl<T> Task<T> {
} }
} }
/// Creates a [`Task`] that runs the given [`Future`] to completion. /// Creates a new [`Task`] that collects all the output of the current one into a [`Vec`].
pub fn perform<A>( pub fn collect(self) -> Task<Vec<T>>
future: impl Future<Output = A> + MaybeSend + 'static,
f: impl Fn(A) -> T + MaybeSend + 'static,
) -> Self
where where
T: MaybeSend + 'static, T: MaybeSend + 'static,
A: MaybeSend + 'static,
{ {
Self::future(future.map(f)) match self.0 {
} None => Task::done(Vec::new()),
Some(stream) => Task(Some(boxed_stream(
stream::unfold(
(stream, Vec::new()),
|(mut stream, mut outputs)| async move {
let action = stream.next().await?;
/// Creates a [`Task`] that runs the given [`Stream`] to completion. match action.output() {
pub fn run<A>( Ok(output) => {
stream: impl Stream<Item = A> + MaybeSend + 'static, outputs.push(output);
f: impl Fn(A) -> T + 'static + MaybeSend,
) -> Self
where
T: 'static,
{
Self::stream(stream.map(f))
}
/// Combines the given tasks and produces a single [`Task`] that will run all of them Some((None, (stream, outputs)))
/// in parallel. }
pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self Err(action) => {
where Some((Some(action), (stream, outputs)))
T: 'static, }
{ }
Self(Some(boxed_stream(stream::select_all( },
tasks.into_iter().filter_map(|task| task.0), )
)))) .filter_map(future::ready),
))),
}
} }
/// Returns the underlying [`Stream`] of the [`Task`]. /// Returns the underlying [`Stream`] of the [`Task`].