Hide Subscription internals

.. and introduce `stream::channel` helper
This commit is contained in:
Héctor Ramón Jiménez 2024-07-05 02:15:13 +02:00
parent e50aa03edc
commit 8bc49cd886
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
11 changed files with 268 additions and 246 deletions

View file

@ -1,4 +1,5 @@
use iced::subscription; use iced::futures;
use iced::Subscription;
use std::hash::Hash; use std::hash::Hash;
@ -7,9 +8,14 @@ pub fn file<I: 'static + Hash + Copy + Send + Sync, T: ToString>(
id: I, id: I,
url: T, url: T,
) -> iced::Subscription<(I, Progress)> { ) -> iced::Subscription<(I, Progress)> {
subscription::unfold(id, State::Ready(url.to_string()), move |state| { Subscription::run_with_id(
download(id, state) id,
}) futures::stream::unfold(State::Ready(url.to_string()), move |state| {
use iced::futures::FutureExt;
download(id, state).map(Some)
}),
)
} }
async fn download<I: Copy>(id: I, state: State) -> ((I, Progress), State) { async fn download<I: Copy>(id: I, state: State) -> ((I, Progress), State) {

View file

@ -1,87 +1,79 @@
pub mod server; pub mod server;
use iced::futures; use iced::futures;
use iced::subscription::{self, Subscription}; use iced::stream;
use iced::widget::text; use iced::widget::text;
use futures::channel::mpsc; use futures::channel::mpsc;
use futures::sink::SinkExt; use futures::sink::SinkExt;
use futures::stream::StreamExt; use futures::stream::{Stream, StreamExt};
use async_tungstenite::tungstenite; use async_tungstenite::tungstenite;
use std::fmt; use std::fmt;
pub fn connect() -> Subscription<Event> { pub fn connect() -> impl Stream<Item = Event> {
struct Connect; stream::channel(100, |mut output| async move {
let mut state = State::Disconnected;
subscription::channel( loop {
std::any::TypeId::of::<Connect>(), match &mut state {
100, State::Disconnected => {
|mut output| async move { const ECHO_SERVER: &str = "ws://127.0.0.1:3030";
let mut state = State::Disconnected;
loop { match async_tungstenite::tokio::connect_async(ECHO_SERVER)
match &mut state {
State::Disconnected => {
const ECHO_SERVER: &str = "ws://127.0.0.1:3030";
match async_tungstenite::tokio::connect_async(
ECHO_SERVER,
)
.await .await
{ {
Ok((websocket, _)) => { Ok((websocket, _)) => {
let (sender, receiver) = mpsc::channel(100); let (sender, receiver) = mpsc::channel(100);
let _ = output let _ = output
.send(Event::Connected(Connection(sender))) .send(Event::Connected(Connection(sender)))
.await;
state = State::Connected(websocket, receiver);
}
Err(_) => {
tokio::time::sleep(
tokio::time::Duration::from_secs(1),
)
.await; .await;
let _ = output.send(Event::Disconnected).await; state = State::Connected(websocket, receiver);
} }
Err(_) => {
tokio::time::sleep(
tokio::time::Duration::from_secs(1),
)
.await;
let _ = output.send(Event::Disconnected).await;
} }
} }
State::Connected(websocket, input) => { }
let mut fused_websocket = websocket.by_ref().fuse(); State::Connected(websocket, input) => {
let mut fused_websocket = websocket.by_ref().fuse();
futures::select! { futures::select! {
received = fused_websocket.select_next_some() => { received = fused_websocket.select_next_some() => {
match received { match received {
Ok(tungstenite::Message::Text(message)) => { Ok(tungstenite::Message::Text(message)) => {
let _ = output.send(Event::MessageReceived(Message::User(message))).await; let _ = output.send(Event::MessageReceived(Message::User(message))).await;
}
Err(_) => {
let _ = output.send(Event::Disconnected).await;
state = State::Disconnected;
}
Ok(_) => continue,
} }
} Err(_) => {
message = input.select_next_some() => {
let result = websocket.send(tungstenite::Message::Text(message.to_string())).await;
if result.is_err() {
let _ = output.send(Event::Disconnected).await; let _ = output.send(Event::Disconnected).await;
state = State::Disconnected; state = State::Disconnected;
} }
Ok(_) => continue,
}
}
message = input.select_next_some() => {
let result = websocket.send(tungstenite::Message::Text(message.to_string())).await;
if result.is_err() {
let _ = output.send(Event::Disconnected).await;
state = State::Disconnected;
} }
} }
} }
} }
} }
}, }
) })
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -83,7 +83,7 @@ impl WebSocket {
} }
fn subscription(&self) -> Subscription<Message> { fn subscription(&self) -> Subscription<Message> {
echo::connect().map(Message::Echo) Subscription::run(echo::connect).map(Message::Echo)
} }
fn view(&self) -> Element<Message> { fn view(&self) -> Element<Message> {

View file

@ -27,7 +27,7 @@ pub mod time {
pub fn every( pub fn every(
duration: std::time::Duration, duration: std::time::Duration,
) -> Subscription<std::time::Instant> { ) -> Subscription<std::time::Instant> {
Subscription::from_recipe(Every(duration)) subscription::from_recipe(Every(duration))
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -26,7 +26,7 @@ pub mod time {
pub fn every( pub fn every(
duration: std::time::Duration, duration: std::time::Duration,
) -> Subscription<std::time::Instant> { ) -> Subscription<std::time::Instant> {
Subscription::from_recipe(Every(duration)) subscription::from_recipe(Every(duration))
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -31,7 +31,7 @@ pub mod time {
pub fn every( pub fn every(
duration: std::time::Duration, duration: std::time::Duration,
) -> Subscription<std::time::Instant> { ) -> Subscription<std::time::Instant> {
Subscription::from_recipe(Every(duration)) subscription::from_recipe(Every(duration))
} }
#[derive(Debug)] #[derive(Debug)]

View file

@ -15,6 +15,7 @@ pub mod backend;
pub mod event; pub mod event;
pub mod executor; pub mod executor;
pub mod keyboard; pub mod keyboard;
pub mod stream;
pub mod subscription; pub mod subscription;
pub use executor::Executor; pub use executor::Executor;

26
futures/src/stream.rs Normal file
View file

@ -0,0 +1,26 @@
//! Create asynchronous streams of data.
use futures::channel::mpsc;
use futures::never::Never;
use futures::stream::{self, Stream, StreamExt};
use std::future::Future;
/// Creates a new [`Stream`] that produces the items sent from a [`Future`]
/// to the [`mpsc::Sender`] provided to the closure.
///
/// This is a more ergonomic [`stream::unfold`], which allows you to go
/// from the "world of futures" to the "world of streams" by simply looping
/// and publishing to an async channel from inside a [`Future`].
pub fn channel<T, F>(
size: usize,
f: impl FnOnce(mpsc::Sender<T>) -> F,
) -> impl Stream<Item = T>
where
F: Future<Output = Never>,
{
let (sender, receiver) = mpsc::channel(size);
let runner = stream::once(f(sender)).map(|_| unreachable!());
stream::select(receiver, runner)
}

View file

@ -5,11 +5,9 @@ pub use tracker::Tracker;
use crate::core::event; use crate::core::event;
use crate::core::window; use crate::core::window;
use crate::futures::{Future, Stream}; use crate::futures::Stream;
use crate::{BoxStream, MaybeSend}; use crate::{BoxStream, MaybeSend};
use futures::channel::mpsc;
use futures::never::Never;
use std::any::TypeId; use std::any::TypeId;
use std::hash::Hash; use std::hash::Hash;
@ -61,20 +59,66 @@ pub type Hasher = rustc_hash::FxHasher;
/// A request to listen to external events. /// A request to listen to external events.
/// ///
/// Besides performing async actions on demand with `Command`, most /// Besides performing async actions on demand with `Task`, most
/// applications also need to listen to external events passively. /// applications also need to listen to external events passively.
/// ///
/// A [`Subscription`] is normally provided to some runtime, like a `Command`, /// A [`Subscription`] is normally provided to some runtime, like a `Task`,
/// and it will generate events as long as the user keeps requesting it. /// and it will generate events as long as the user keeps requesting it.
/// ///
/// For instance, you can use a [`Subscription`] to listen to a `WebSocket` /// For instance, you can use a [`Subscription`] to listen to a `WebSocket`
/// connection, keyboard presses, mouse events, time ticks, etc. /// connection, keyboard presses, mouse events, time ticks, etc.
///
/// # The Lifetime of a [`Subscription`]
/// Much like a [`Future`] or a [`Stream`], a [`Subscription`] does not produce any effects
/// on its own. For a [`Subscription`] to run, it must be returned to the iced runtime—normally
/// in the `subscription` function of an `application` or a `daemon`.
///
/// When a [`Subscription`] is provided to the runtime for the first time, the runtime will
/// start running it asynchronously. Running a [`Subscription`] consists in building its underlying
/// [`Stream`] and executing it in an async runtime.
///
/// Therefore, you can think of a [`Subscription`] as a "stream builder". It simply represents a way
/// to build a certain [`Stream`] together with some way to _identify_ it.
///
/// Identification is important because when a specific [`Subscription`] stops being returned to the
/// iced runtime, the runtime will kill its associated [`Stream`]. The runtime uses the identity of a
/// [`Subscription`] to keep track of it.
///
/// This way, iced allows you to declaratively __subscribe__ to particular streams of data temporarily
/// and whenever necessary.
///
/// ```
/// # mod iced {
/// # pub mod time {
/// # pub use iced_futures::backend::default::time::every;
/// # pub use std::time::{Duration, Instant};
/// # }
/// #
/// # pub use iced_futures::Subscription;
/// # }
/// use iced::time::{self, Duration, Instant};
/// use iced::Subscription;
///
/// struct State {
/// timer_enabled: bool,
/// }
///
/// fn subscription(state: &State) -> Subscription<Instant> {
/// if state.timer_enabled {
/// time::every(Duration::from_secs(1))
/// } else {
/// Subscription::none()
/// }
/// }
/// ```
///
/// [`Future`]: std::future::Future
#[must_use = "`Subscription` must be returned to runtime to take effect"] #[must_use = "`Subscription` must be returned to runtime to take effect"]
pub struct Subscription<Message> { pub struct Subscription<T> {
recipes: Vec<Box<dyn Recipe<Output = Message>>>, recipes: Vec<Box<dyn Recipe<Output = T>>>,
} }
impl<Message> Subscription<Message> { impl<T> Subscription<T> {
/// Returns an empty [`Subscription`] that will not produce any output. /// Returns an empty [`Subscription`] that will not produce any output.
pub fn none() -> Self { pub fn none() -> Self {
Self { Self {
@ -82,19 +126,102 @@ impl<Message> Subscription<Message> {
} }
} }
/// Creates a [`Subscription`] from a [`Recipe`] describing it. /// Returns a [`Subscription`] that will call the given function to create and
pub fn from_recipe( /// asynchronously run the given [`Stream`].
recipe: impl Recipe<Output = Message> + 'static, ///
) -> Self { /// # Creating an asynchronous worker with bidirectional communication
Self { /// You can leverage this helper to create a [`Subscription`] that spawns
recipes: vec![Box::new(recipe)], /// an asynchronous worker in the background and establish a channel of
} /// communication with an `iced` application.
///
/// You can achieve this by creating an `mpsc` channel inside the closure
/// and returning the `Sender` as a `Message` for the `Application`:
///
/// ```
/// use iced_futures::subscription::{self, Subscription};
/// use iced_futures::stream;
/// use iced_futures::futures::channel::mpsc;
/// use iced_futures::futures::sink::SinkExt;
/// use iced_futures::futures::Stream;
///
/// pub enum Event {
/// Ready(mpsc::Sender<Input>),
/// WorkFinished,
/// // ...
/// }
///
/// enum Input {
/// DoSomeWork,
/// // ...
/// }
///
/// fn some_worker() -> impl Stream<Item = Event> {
/// stream::channel(100, |mut output| async move {
/// // Create channel
/// let (sender, mut receiver) = mpsc::channel(100);
///
/// // Send the sender back to the application
/// output.send(Event::Ready(sender)).await;
///
/// loop {
/// use iced_futures::futures::StreamExt;
///
/// // Read next input sent from `Application`
/// let input = receiver.select_next_some().await;
///
/// match input {
/// Input::DoSomeWork => {
/// // Do some async work...
///
/// // Finally, we can optionally produce a message to tell the
/// // `Application` the work is done
/// output.send(Event::WorkFinished).await;
/// }
/// }
/// }
/// })
/// }
///
/// fn subscription() -> Subscription<Event> {
/// Subscription::run(some_worker)
/// }
/// ```
///
/// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
/// connection open.
///
/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket
pub fn run<S>(builder: fn() -> S) -> Self
where
S: Stream<Item = T> + MaybeSend + 'static,
T: 'static,
{
from_recipe(Runner {
id: builder,
spawn: move |_| builder(),
})
}
/// Returns a [`Subscription`] that will create and asynchronously run the
/// given [`Stream`].
///
/// The `id` will be used to uniquely identify the [`Subscription`].
pub fn run_with_id<I, S>(id: I, stream: S) -> Subscription<T>
where
I: Hash + 'static,
S: Stream<Item = T> + MaybeSend + 'static,
T: 'static,
{
from_recipe(Runner {
id,
spawn: move |_| stream,
})
} }
/// Batches all the provided subscriptions and returns the resulting /// Batches all the provided subscriptions and returns the resulting
/// [`Subscription`]. /// [`Subscription`].
pub fn batch( pub fn batch(
subscriptions: impl IntoIterator<Item = Subscription<Message>>, subscriptions: impl IntoIterator<Item = Subscription<T>>,
) -> Self { ) -> Self {
Self { Self {
recipes: subscriptions recipes: subscriptions
@ -104,18 +231,13 @@ impl<Message> Subscription<Message> {
} }
} }
/// Returns the different recipes of the [`Subscription`].
pub fn into_recipes(self) -> Vec<Box<dyn Recipe<Output = Message>>> {
self.recipes
}
/// Adds a value to the [`Subscription`] context. /// Adds a value to the [`Subscription`] context.
/// ///
/// The value will be part of the identity of a [`Subscription`]. /// The value will be part of the identity of a [`Subscription`].
pub fn with<T>(mut self, value: T) -> Subscription<(T, Message)> pub fn with<A>(mut self, value: A) -> Subscription<(A, T)>
where where
Message: 'static, T: 'static,
T: std::hash::Hash + Clone + Send + Sync + 'static, A: std::hash::Hash + Clone + Send + Sync + 'static,
{ {
Subscription { Subscription {
recipes: self recipes: self
@ -123,7 +245,7 @@ impl<Message> Subscription<Message> {
.drain(..) .drain(..)
.map(|recipe| { .map(|recipe| {
Box::new(With::new(recipe, value.clone())) Box::new(With::new(recipe, value.clone()))
as Box<dyn Recipe<Output = (T, Message)>> as Box<dyn Recipe<Output = (A, T)>>
}) })
.collect(), .collect(),
} }
@ -136,8 +258,8 @@ impl<Message> Subscription<Message> {
/// will panic in debug mode otherwise. /// will panic in debug mode otherwise.
pub fn map<F, A>(mut self, f: F) -> Subscription<A> pub fn map<F, A>(mut self, f: F) -> Subscription<A>
where where
Message: 'static, T: 'static,
F: Fn(Message) -> A + MaybeSend + Clone + 'static, F: Fn(T) -> A + MaybeSend + Clone + 'static,
A: 'static, A: 'static,
{ {
debug_assert!( debug_assert!(
@ -159,7 +281,23 @@ impl<Message> Subscription<Message> {
} }
} }
impl<Message> std::fmt::Debug for Subscription<Message> { /// Creates a [`Subscription`] from a [`Recipe`] describing it.
pub fn from_recipe<T>(
recipe: impl Recipe<Output = T> + 'static,
) -> Subscription<T> {
Subscription {
recipes: vec![Box::new(recipe)],
}
}
/// Returns the different recipes of the [`Subscription`].
pub fn into_recipes<T>(
subscription: Subscription<T>,
) -> Vec<Box<dyn Recipe<Output = T>>> {
subscription.recipes
}
impl<T> std::fmt::Debug for Subscription<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription").finish() f.debug_struct("Subscription").finish()
} }
@ -273,65 +411,13 @@ where
} }
} }
/// Returns a [`Subscription`] that will call the given function to create and pub(crate) fn filter_map<I, F, T>(id: I, f: F) -> Subscription<T>
/// asynchronously run the given [`Stream`].
pub fn run<S, Message>(builder: fn() -> S) -> Subscription<Message>
where
S: Stream<Item = Message> + MaybeSend + 'static,
Message: 'static,
{
Subscription::from_recipe(Runner {
id: builder,
spawn: move |_| builder(),
})
}
/// Returns a [`Subscription`] that will create and asynchronously run the
/// given [`Stream`].
///
/// The `id` will be used to uniquely identify the [`Subscription`].
pub fn run_with_id<I, S, Message>(id: I, stream: S) -> Subscription<Message>
where where
I: Hash + 'static, I: Hash + 'static,
S: Stream<Item = Message> + MaybeSend + 'static, F: Fn(Event) -> Option<T> + MaybeSend + 'static,
Message: 'static, T: 'static + MaybeSend,
{ {
Subscription::from_recipe(Runner { from_recipe(Runner {
id,
spawn: move |_| stream,
})
}
/// Returns a [`Subscription`] that will create and asynchronously run a
/// [`Stream`] that will call the provided closure to produce every `Message`.
///
/// The `id` will be used to uniquely identify the [`Subscription`].
pub fn unfold<I, T, Fut, Message>(
id: I,
initial: T,
mut f: impl FnMut(T) -> Fut + MaybeSend + Sync + 'static,
) -> Subscription<Message>
where
I: Hash + 'static,
T: MaybeSend + 'static,
Fut: Future<Output = (Message, T)> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
use futures::future::FutureExt;
run_with_id(
id,
futures::stream::unfold(initial, move |state| f(state).map(Some)),
)
}
pub(crate) fn filter_map<I, F, Message>(id: I, f: F) -> Subscription<Message>
where
I: Hash + 'static,
F: Fn(Event) -> Option<Message> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
Subscription::from_recipe(Runner {
id, id,
spawn: |events| { spawn: |events| {
use futures::future; use futures::future;
@ -342,107 +428,22 @@ where
}) })
} }
/// Creates a [`Subscription`] that publishes the events sent from a [`Future`] struct Runner<I, F, S, T>
/// to an [`mpsc::Sender`] with the given bounds.
///
/// # Creating an asynchronous worker with bidirectional communication
/// You can leverage this helper to create a [`Subscription`] that spawns
/// an asynchronous worker in the background and establish a channel of
/// communication with an `iced` application.
///
/// You can achieve this by creating an `mpsc` channel inside the closure
/// and returning the `Sender` as a `Message` for the `Application`:
///
/// ```
/// use iced_futures::subscription::{self, Subscription};
/// use iced_futures::futures::channel::mpsc;
/// use iced_futures::futures::sink::SinkExt;
///
/// pub enum Event {
/// Ready(mpsc::Sender<Input>),
/// WorkFinished,
/// // ...
/// }
///
/// enum Input {
/// DoSomeWork,
/// // ...
/// }
///
/// fn some_worker() -> Subscription<Event> {
/// struct SomeWorker;
///
/// subscription::channel(std::any::TypeId::of::<SomeWorker>(), 100, |mut output| async move {
/// // Create channel
/// let (sender, mut receiver) = mpsc::channel(100);
///
/// // Send the sender back to the application
/// output.send(Event::Ready(sender)).await;
///
/// loop {
/// use iced_futures::futures::StreamExt;
///
/// // Read next input sent from `Application`
/// let input = receiver.select_next_some().await;
///
/// match input {
/// Input::DoSomeWork => {
/// // Do some async work...
///
/// // Finally, we can optionally produce a message to tell the
/// // `Application` the work is done
/// output.send(Event::WorkFinished).await;
/// }
/// }
/// }
/// })
/// }
/// ```
///
/// Check out the [`websocket`] example, which showcases this pattern to maintain a `WebSocket`
/// connection open.
///
/// [`websocket`]: https://github.com/iced-rs/iced/tree/0.12/examples/websocket
pub fn channel<I, Fut, Message>(
id: I,
size: usize,
f: impl FnOnce(mpsc::Sender<Message>) -> Fut + MaybeSend + 'static,
) -> Subscription<Message>
where
I: Hash + 'static,
Fut: Future<Output = Never> + MaybeSend + 'static,
Message: 'static + MaybeSend,
{
use futures::stream::{self, StreamExt};
Subscription::from_recipe(Runner {
id,
spawn: move |_| {
let (sender, receiver) = mpsc::channel(size);
let runner = stream::once(f(sender)).map(|_| unreachable!());
stream::select(receiver, runner)
},
})
}
struct Runner<I, F, S, Message>
where where
F: FnOnce(EventStream) -> S, F: FnOnce(EventStream) -> S,
S: Stream<Item = Message>, S: Stream<Item = T>,
{ {
id: I, id: I,
spawn: F, spawn: F,
} }
impl<I, S, F, Message> Recipe for Runner<I, F, S, Message> impl<I, F, S, T> Recipe for Runner<I, F, S, T>
where where
I: Hash + 'static, I: Hash + 'static,
F: FnOnce(EventStream) -> S, F: FnOnce(EventStream) -> S,
S: Stream<Item = Message> + MaybeSend + 'static, S: Stream<Item = T> + MaybeSend + 'static,
{ {
type Output = Message; type Output = T;
fn hash(&self, state: &mut Hasher) { fn hash(&self, state: &mut Hasher) {
std::any::TypeId::of::<I>().hash(state); std::any::TypeId::of::<I>().hash(state);

View file

@ -175,6 +175,7 @@ use iced_winit::core;
use iced_winit::runtime; use iced_winit::runtime;
pub use iced_futures::futures; pub use iced_futures::futures;
pub use iced_futures::stream;
#[cfg(feature = "highlighter")] #[cfg(feature = "highlighter")]
pub use iced_highlighter as highlighter; pub use iced_highlighter as highlighter;
@ -202,6 +203,7 @@ pub use crate::core::{
Theme, Transformation, Vector, Theme, Transformation, Vector,
}; };
pub use crate::runtime::{exit, Task}; pub use crate::runtime::{exit, Task};
pub use iced_futures::Subscription;
pub mod clipboard { pub mod clipboard {
//! Access the clipboard. //! Access the clipboard.
@ -255,13 +257,6 @@ pub mod mouse {
}; };
} }
pub mod subscription {
//! Listen to external events in your application.
pub use iced_futures::subscription::{
channel, run, run_with_id, unfold, Subscription,
};
}
#[cfg(feature = "system")] #[cfg(feature = "system")]
pub mod system { pub mod system {
//! Retrieve system information. //! Retrieve system information.
@ -314,7 +309,6 @@ pub use executor::Executor;
pub use font::Font; pub use font::Font;
pub use renderer::Renderer; pub use renderer::Renderer;
pub use settings::Settings; pub use settings::Settings;
pub use subscription::Subscription;
#[doc(inline)] #[doc(inline)]
pub use application::application; pub use application::application;

View file

@ -207,7 +207,9 @@ where
runtime.run(stream); runtime.run(stream);
} }
runtime.track(program.subscription().map(Action::Output).into_recipes()); runtime.track(subscription::into_recipes(
program.subscription().map(Action::Output),
));
let (boot_sender, boot_receiver) = oneshot::channel(); let (boot_sender, boot_receiver) = oneshot::channel();
let (event_sender, event_receiver) = mpsc::unbounded(); let (event_sender, event_receiver) = mpsc::unbounded();
@ -1120,7 +1122,7 @@ fn update<P: Program, E: Executor>(
} }
let subscription = program.subscription(); let subscription = program.subscription();
runtime.track(subscription.map(Action::Output).into_recipes()); runtime.track(subscription::into_recipes(subscription.map(Action::Output)));
} }
fn run_action<P, C>( fn run_action<P, C>(