Hide internal Task constructors

This commit is contained in:
Héctor Ramón Jiménez 2024-07-05 01:13:28 +02:00
parent 2b19471d1c
commit 88611d7653
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
11 changed files with 140 additions and 136 deletions

View file

@ -1,3 +1,4 @@
//! Create runtime tasks.
use crate::core::widget;
use crate::futures::futures::channel::mpsc;
use crate::futures::futures::channel::oneshot;
@ -29,24 +30,6 @@ impl<T> Task<T> {
Self::future(future::ready(value))
}
/// Creates a new [`Task`] that runs the given [`Future`] and produces
/// its output.
pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
where
T: 'static,
{
Self::stream(stream::once(future))
}
/// Creates a new [`Task`] that runs the given [`Stream`] and produces
/// each of its items.
pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
where
T: 'static,
{
Self(Some(boxed_stream(stream.map(Action::Output))))
}
/// Creates a [`Task`] that runs the given [`Future`] to completion and maps its
/// output with the given closure.
pub fn perform<A>(
@ -72,6 +55,24 @@ impl<T> Task<T> {
Self::stream(stream.map(f))
}
/// Creates a new [`Task`] that runs the given [`Future`] and produces
/// its output.
pub fn future(future: impl Future<Output = T> + MaybeSend + 'static) -> Self
where
T: 'static,
{
Self::stream(stream::once(future))
}
/// Creates a new [`Task`] that runs the given [`Stream`] and produces
/// each of its items.
pub fn stream(stream: impl Stream<Item = T> + MaybeSend + 'static) -> Self
where
T: 'static,
{
Self(Some(boxed_stream(stream.map(Action::Output))))
}
/// Combines the given tasks and produces a single [`Task`] that will run all of them
/// in parallel.
pub fn batch(tasks: impl IntoIterator<Item = Self>) -> Self
@ -83,66 +84,6 @@ impl<T> Task<T> {
))))
}
/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
/// its output.
pub fn widget(operation: impl widget::Operation<T> + 'static) -> Task<T>
where
T: Send + 'static,
{
Self::channel(move |sender| {
let operation =
widget::operation::map(Box::new(operation), move |value| {
let _ = sender.clone().try_send(value);
});
Action::Widget(Box::new(operation))
})
}
/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
/// produces the value fed to the [`oneshot::Sender`].
pub fn oneshot(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
where
T: MaybeSend + 'static,
{
let (sender, receiver) = oneshot::channel();
let action = f(sender);
Self(Some(boxed_stream(
stream::once(async move { action }).chain(
receiver.into_stream().filter_map(|result| async move {
Some(Action::Output(result.ok()?))
}),
),
)))
}
/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
/// produces the values fed to the [`mpsc::Sender`].
pub fn channel(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
where
T: MaybeSend + 'static,
{
let (sender, receiver) = mpsc::channel(1);
let action = f(sender);
Self(Some(boxed_stream(
stream::once(async move { action })
.chain(receiver.map(|result| Action::Output(result))),
)))
}
/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
pub fn effect(action: impl Into<Action<Never>>) -> Self {
let action = action.into();
Self(Some(boxed_stream(stream::once(async move {
action.output().expect_err("no output")
}))))
}
/// Maps the output of a [`Task`] with the given closure.
pub fn map<O>(
self,
@ -235,11 +176,6 @@ impl<T> Task<T> {
))),
}
}
/// Returns the underlying [`Stream`] of the [`Task`].
pub fn into_stream(self) -> Option<BoxStream<Action<T>>> {
self.0
}
}
impl<T> Task<Option<T>> {
@ -283,3 +219,68 @@ where
Self::none()
}
}
/// Creates a new [`Task`] that runs the given [`widget::Operation`] and produces
/// its output.
pub fn widget<T>(operation: impl widget::Operation<T> + 'static) -> Task<T>
where
T: Send + 'static,
{
channel(move |sender| {
let operation =
widget::operation::map(Box::new(operation), move |value| {
let _ = sender.clone().try_send(value);
});
Action::Widget(Box::new(operation))
})
}
/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
/// produces the value fed to the [`oneshot::Sender`].
pub fn oneshot<T>(f: impl FnOnce(oneshot::Sender<T>) -> Action<T>) -> Task<T>
where
T: MaybeSend + 'static,
{
let (sender, receiver) = oneshot::channel();
let action = f(sender);
Task(Some(boxed_stream(
stream::once(async move { action }).chain(
receiver.into_stream().filter_map(|result| async move {
Some(Action::Output(result.ok()?))
}),
),
)))
}
/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
/// produces the values fed to the [`mpsc::Sender`].
pub fn channel<T>(f: impl FnOnce(mpsc::Sender<T>) -> Action<T>) -> Task<T>
where
T: MaybeSend + 'static,
{
let (sender, receiver) = mpsc::channel(1);
let action = f(sender);
Task(Some(boxed_stream(
stream::once(async move { action })
.chain(receiver.map(|result| Action::Output(result))),
)))
}
/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
let action = action.into();
Task(Some(boxed_stream(stream::once(async move {
action.output().expect_err("no output")
}))))
}
/// Returns the underlying [`Stream`] of the [`Task`].
pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
task.0
}