From 121102e55beaa52c78a43eb5e60b07769c2b5b04 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A9ctor=20Ram=C3=B3n=20Jim=C3=A9nez?= Date: Thu, 13 Mar 2025 02:07:06 +0100 Subject: [PATCH] Track and report `Task::units` to `debug` API --- debug/src/lib.rs | 8 +- runtime/src/task.rs | 190 +++++++++++++++++++++++++++----------------- winit/src/lib.rs | 1 + 3 files changed, 121 insertions(+), 78 deletions(-) diff --git a/debug/src/lib.rs b/debug/src/lib.rs index a7d9b037..a6b73f96 100644 --- a/debug/src/lib.rs +++ b/debug/src/lib.rs @@ -17,8 +17,8 @@ pub fn theme_changed(f: impl FnOnce() -> Option) { internal::theme_changed(f); } -pub fn commands_spawned(amount: usize) { - internal::commands_spawned(amount) +pub fn tasks_spawned(amount: usize) { + internal::tasks_spawned(amount) } pub fn subscriptions_tracked(amount: usize) { @@ -114,7 +114,7 @@ mod internal { } } - pub fn commands_spawned(amount: usize) { + pub fn tasks_spawned(amount: usize) { BEACON.log(client::Event::CommandsSpawned(amount)); } @@ -215,7 +215,7 @@ mod internal { pub fn theme_changed(_f: impl FnOnce() -> Option) {} - pub fn commands_spawned(_amount: usize) {} + pub fn tasks_spawned(_amount: usize) {} pub fn subscriptions_tracked(_amount: usize) {} diff --git a/runtime/src/task.rs b/runtime/src/task.rs index fd5970ac..f8960539 100644 --- a/runtime/src/task.rs +++ b/runtime/src/task.rs @@ -17,12 +17,18 @@ pub use sipper::{Never, Sender, Sipper, Straw, sipper, stream}; /// A [`Task`] _may_ produce a bunch of values of type `T`. #[allow(missing_debug_implementations)] #[must_use = "`Task` must be returned to the runtime to take effect; normally in your `update` or `new` functions."] -pub struct Task(Option>>); +pub struct Task { + stream: Option>>, + units: usize, +} impl Task { /// Creates a [`Task`] that does nothing. pub fn none() -> Self { - Self(None) + Self { + stream: None, + units: 0, + } } /// Creates a new [`Task`] that instantly produces the given value. @@ -80,9 +86,16 @@ impl Task { where T: 'static, { - Self(Some(boxed_stream(stream::select_all( - tasks.into_iter().filter_map(|task| task.0), - )))) + let select_all = stream::select_all( + tasks.into_iter().filter_map(|task| task.stream), + ); + + let units = select_all.len(); + + Self { + stream: Some(boxed_stream(select_all)), + units, + } } /// Maps the output of a [`Task`] with the given closure. @@ -110,21 +123,26 @@ impl Task { T: MaybeSend + 'static, O: MaybeSend + 'static, { - Task(match self.0 { - None => None, - Some(stream) => { - Some(boxed_stream(stream.flat_map(move |action| { - match action.output() { - Ok(output) => f(output) - .0 - .unwrap_or_else(|| boxed_stream(stream::empty())), - Err(action) => { - boxed_stream(stream::once(async move { action })) + Task { + stream: match self.stream { + None => None, + Some(stream) => { + Some(boxed_stream(stream.flat_map(move |action| { + match action.output() { + Ok(output) => { + f(output).stream.unwrap_or_else(|| { + boxed_stream(stream::empty()) + }) + } + Err(action) => boxed_stream(stream::once( + async move { action }, + )), } - } - }))) - } - }) + }))) + } + }, + units: self.units, + } } /// Chains a new [`Task`] to be performed once the current one finishes completely. @@ -132,11 +150,17 @@ impl Task { where T: 'static, { - match self.0 { + match self.stream { None => task, - Some(first) => match task.0 { - None => Task(Some(first)), - Some(second) => Task(Some(boxed_stream(first.chain(second)))), + Some(first) => match task.stream { + None => Self { + stream: Some(first), + units: self.units, + }, + Some(second) => Self { + stream: Some(boxed_stream(first.chain(second))), + units: self.units + task.units, + }, }, } } @@ -146,35 +170,39 @@ impl Task { where T: MaybeSend + 'static, { - match self.0 { + match self.stream { None => Task::done(Vec::new()), - Some(stream) => Task(Some(boxed_stream( - stream::unfold( - (stream, Some(Vec::new())), - move |(mut stream, outputs)| async move { - let mut outputs = outputs?; + Some(stream) => Task { + stream: Some(boxed_stream( + stream::unfold( + (stream, Some(Vec::new())), + move |(mut stream, outputs)| async move { + let mut outputs = outputs?; - let Some(action) = stream.next().await else { - return Some(( - Some(Action::Output(outputs)), - (stream, None), - )); - }; + let Some(action) = stream.next().await else { + return Some(( + Some(Action::Output(outputs)), + (stream, None), + )); + }; - match action.output() { - Ok(output) => { - outputs.push(output); + match action.output() { + Ok(output) => { + outputs.push(output); - Some((None, (stream, Some(outputs)))) + Some((None, (stream, Some(outputs)))) + } + Err(action) => Some(( + Some(action), + (stream, Some(outputs)), + )), } - Err(action) => { - Some((Some(action), (stream, Some(outputs)))) - } - } - }, - ) - .filter_map(future::ready), - ))), + }, + ) + .filter_map(future::ready), + )), + units: self.units, + }, } } @@ -194,26 +222,25 @@ impl Task { where T: 'static, { - match self.0 { + let (stream, handle) = match self.stream { Some(stream) => { let (stream, handle) = stream::abortable(stream); - ( - Self(Some(boxed_stream(stream))), - Handle { - internal: InternalHandle::Manual(handle), - }, - ) + (Some(boxed_stream(stream)), InternalHandle::Manual(handle)) } None => ( - Self(None), - Handle { - internal: InternalHandle::Manual( - stream::AbortHandle::new_pair().0, - ), - }, + None, + InternalHandle::Manual(stream::AbortHandle::new_pair().0), ), - } + }; + + ( + Self { + stream, + units: self.units, + }, + Handle { internal: handle }, + ) } /// Creates a new [`Task`] that runs the given [`Future`] and produces @@ -231,7 +258,15 @@ impl Task { where T: 'static, { - Self(Some(boxed_stream(stream.map(Action::Output)))) + Self { + stream: Some(boxed_stream(stream.map(Action::Output))), + units: 1, + } + } + + /// Returns the amount of work "units" of the [`Task`]. + pub fn units(&self) -> usize { + self.units } } @@ -365,13 +400,14 @@ where let action = f(sender); - Task(Some(boxed_stream( - stream::once(async move { action }).chain( + Task { + stream: Some(boxed_stream(stream::once(async move { action }).chain( receiver.into_stream().filter_map(|result| async move { Some(Action::Output(result.ok()?)) }), - ), - ))) + ))), + units: 1, + } } /// Creates a new [`Task`] that executes the [`Action`] returned by the closure and @@ -384,22 +420,28 @@ where let action = f(sender); - Task(Some(boxed_stream( - stream::once(async move { action }) - .chain(receiver.map(|result| Action::Output(result))), - ))) + Task { + stream: Some(boxed_stream( + stream::once(async move { action }) + .chain(receiver.map(|result| Action::Output(result))), + )), + units: 1, + } } /// Creates a new [`Task`] that executes the given [`Action`] and produces no output. pub fn effect(action: impl Into>) -> Task { let action = action.into(); - Task(Some(boxed_stream(stream::once(async move { - action.output().expect_err("no output") - })))) + Task { + stream: Some(boxed_stream(stream::once(async move { + action.output().expect_err("no output") + }))), + units: 1, + } } /// Returns the underlying [`Stream`] of the [`Task`]. pub fn into_stream(task: Task) -> Option>> { - task.0 + task.stream } diff --git a/winit/src/lib.rs b/winit/src/lib.rs index 2c79e7f3..33a58c63 100644 --- a/winit/src/lib.rs +++ b/winit/src/lib.rs @@ -1100,6 +1100,7 @@ fn update( for message in messages.drain(..) { let update_span = debug::update(&message); let task = runtime.enter(|| program.update(message)); + debug::tasks_spawned(task.units()); update_span.finish(); if let Some(stream) = runtime::task::into_stream(task) {