Track and report Task::units to debug API

This commit is contained in:
Héctor Ramón Jiménez 2025-03-13 02:07:06 +01:00
parent 5f15522368
commit 121102e55b
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
3 changed files with 121 additions and 78 deletions

View file

@ -17,12 +17,18 @@ pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream};
/// A [`Task`] _may_ produce a bunch of values of type `T`.
#[allow(missing_debug_implementations)]
#[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."]
pub struct Task<T>(Option<BoxStream<Action<T>>>);
pub struct Task<T> {
stream: Option<BoxStream<Action<T>>>,
units: usize,
}
impl<T> Task<T> {
/// Creates a [`Task`] that does nothing.
pub fn none() -> Self {
Self(None)
Self {
stream: None,
units: 0,
}
}
/// Creates a new [`Task`] that instantly produces the given value.
@ -80,9 +86,16 @@ impl<T> Task<T> {
where
T: 'static,
{
Self(Some(boxed_stream(stream::select_all(
tasks.into_iter().filter_map(|task| task.0),
))))
let select_all = stream::select_all(
tasks.into_iter().filter_map(|task| task.stream),
);
let units = select_all.len();
Self {
stream: Some(boxed_stream(select_all)),
units,
}
}
/// Maps the output of a [`Task`] with the given closure.
@ -110,21 +123,26 @@ impl<T> Task<T> {
T: MaybeSend + 'static,
O: MaybeSend + 'static,
{
Task(match self.0 {
None => None,
Some(stream) => {
Some(boxed_stream(stream.flat_map(move |action| {
match action.output() {
Ok(output) => f(output)
.0
.unwrap_or_else(|| boxed_stream(stream::empty())),
Err(action) => {
boxed_stream(stream::once(async move { action }))
Task {
stream: match self.stream {
None => None,
Some(stream) => {
Some(boxed_stream(stream.flat_map(move |action| {
match action.output() {
Ok(output) => {
f(output).stream.unwrap_or_else(|| {
boxed_stream(stream::empty())
})
}
Err(action) => boxed_stream(stream::once(
async move { action },
)),
}
}
})))
}
})
})))
}
},
units: self.units,
}
}
/// Chains a new [`Task`] to be performed once the current one finishes completely.
@ -132,11 +150,17 @@ impl<T> Task<T> {
where
T: 'static,
{
match self.0 {
match self.stream {
None => task,
Some(first) => match task.0 {
None => Task(Some(first)),
Some(second) => Task(Some(boxed_stream(first.chain(second)))),
Some(first) => match task.stream {
None => Self {
stream: Some(first),
units: self.units,
},
Some(second) => Self {
stream: Some(boxed_stream(first.chain(second))),
units: self.units + task.units,
},
},
}
}
@ -146,35 +170,39 @@ impl<T> Task<T> {
where
T: MaybeSend + 'static,
{
match self.0 {
match self.stream {
None => Task::done(Vec::new()),
Some(stream) => Task(Some(boxed_stream(
stream::unfold(
(stream, Some(Vec::new())),
move |(mut stream, outputs)| async move {
let mut outputs = outputs?;
Some(stream) => Task {
stream: Some(boxed_stream(
stream::unfold(
(stream, Some(Vec::new())),
move |(mut stream, outputs)| async move {
let mut outputs = outputs?;
let Some(action) = stream.next().await else {
return Some((
Some(Action::Output(outputs)),
(stream, None),
));
};
let Some(action) = stream.next().await else {
return Some((
Some(Action::Output(outputs)),
(stream, None),
));
};
match action.output() {
Ok(output) => {
outputs.push(output);
match action.output() {
Ok(output) => {
outputs.push(output);
Some((None, (stream, Some(outputs))))
Some((None, (stream, Some(outputs))))
}
Err(action) => Some((
Some(action),
(stream, Some(outputs)),
)),
}
Err(action) => {
Some((Some(action), (stream, Some(outputs))))
}
}
},
)
.filter_map(future::ready),
))),
},
)
.filter_map(future::ready),
)),
units: self.units,
},
}
}
@ -194,26 +222,25 @@ impl<T> Task<T> {
where
T: 'static,
{
match self.0 {
let (stream, handle) = match self.stream {
Some(stream) => {
let (stream, handle) = stream::abortable(stream);
(
Self(Some(boxed_stream(stream))),
Handle {
internal: InternalHandle::Manual(handle),
},
)
(Some(boxed_stream(stream)), InternalHandle::Manual(handle))
}
None => (
Self(None),
Handle {
internal: InternalHandle::Manual(
stream::AbortHandle::new_pair().0,
),
},
None,
InternalHandle::Manual(stream::AbortHandle::new_pair().0),
),
}
};
(
Self {
stream,
units: self.units,
},
Handle { internal: handle },
)
}
/// Creates a new [`Task`] that runs the given [`Future`] and produces
@ -231,7 +258,15 @@ impl<T> Task<T> {
where
T: 'static,
{
Self(Some(boxed_stream(stream.map(Action::Output))))
Self {
stream: Some(boxed_stream(stream.map(Action::Output))),
units: 1,
}
}
/// Returns the amount of work "units" of the [`Task`].
pub fn units(&self) -> usize {
self.units
}
}
@ -365,13 +400,14 @@ where
let action = f(sender);
Task(Some(boxed_stream(
stream::once(async move { action }).chain(
Task {
stream: Some(boxed_stream(stream::once(async move { action }).chain(
receiver.into_stream().filter_map(|result| async move {
Some(Action::Output(result.ok()?))
}),
),
)))
))),
units: 1,
}
}
/// Creates a new [`Task`] that executes the [`Action`] returned by the closure and
@ -384,22 +420,28 @@ where
let action = f(sender);
Task(Some(boxed_stream(
stream::once(async move { action })
.chain(receiver.map(|result| Action::Output(result))),
)))
Task {
stream: Some(boxed_stream(
stream::once(async move { action })
.chain(receiver.map(|result| Action::Output(result))),
)),
units: 1,
}
}
/// Creates a new [`Task`] that executes the given [`Action`] and produces no output.
pub fn effect<T>(action: impl Into<Action<Never>>) -> Task<T> {
let action = action.into();
Task(Some(boxed_stream(stream::once(async move {
action.output().expect_err("no output")
}))))
Task {
stream: Some(boxed_stream(stream::once(async move {
action.output().expect_err("no output")
}))),
units: 1,
}
}
/// Returns the underlying [`Stream`] of the [`Task`].
pub fn into_stream<T>(task: Task<T>) -> Option<BoxStream<Action<T>>> {
task.0
task.stream
}