Implement Command::run for executing a Stream to completion

This commit is contained in:
Héctor Ramón Jiménez 2023-11-29 00:12:48 +01:00
parent 133f4da901
commit 3b7d479534
No known key found for this signature in database
GPG key ID: 7CC46565708259A7
4 changed files with 48 additions and 3 deletions

View file

@ -1,7 +1,7 @@
//! Run commands and keep track of subscriptions. //! Run commands and keep track of subscriptions.
use crate::core::event::{self, Event}; use crate::core::event::{self, Event};
use crate::subscription; use crate::subscription;
use crate::{BoxFuture, Executor, MaybeSend}; use crate::{BoxFuture, BoxStream, Executor, MaybeSend};
use futures::{channel::mpsc, Sink}; use futures::{channel::mpsc, Sink};
use std::marker::PhantomData; use std::marker::PhantomData;
@ -69,6 +69,29 @@ where
self.executor.spawn(future); self.executor.spawn(future);
} }
/// Runs a [`Stream`] in the [`Runtime`] until completion.
///
/// The resulting `Message`s will be forwarded to the `Sender` of the
/// [`Runtime`].
///
/// [`Stream`]: BoxStream
pub fn run(&mut self, stream: BoxStream<Message>) {
use futures::{FutureExt, StreamExt};
let sender = self.sender.clone();
let future =
stream.map(Ok).forward(sender).map(|result| match result {
Ok(()) => (),
Err(error) => {
log::warn!(
"Stream could not run until completion: {error}"
);
}
});
self.executor.spawn(future);
}
/// Tracks a [`Subscription`] in the [`Runtime`]. /// Tracks a [`Subscription`] in the [`Runtime`].
/// ///
/// It will spawn new streams or close old ones as necessary! See /// It will spawn new streams or close old ones as necessary! See

View file

@ -4,8 +4,10 @@ mod action;
pub use action::Action; pub use action::Action;
use crate::core::widget; use crate::core::widget;
use crate::futures::futures;
use crate::futures::MaybeSend; use crate::futures::MaybeSend;
use futures::Stream;
use std::fmt; use std::fmt;
use std::future::Future; use std::future::Future;
@ -43,11 +45,21 @@ impl<T> Command<T> {
future: impl Future<Output = A> + 'static + MaybeSend, future: impl Future<Output = A> + 'static + MaybeSend,
f: impl FnOnce(A) -> T + 'static + MaybeSend, f: impl FnOnce(A) -> T + 'static + MaybeSend,
) -> Command<T> { ) -> Command<T> {
use iced_futures::futures::FutureExt; use futures::FutureExt;
Command::single(Action::Future(Box::pin(future.map(f)))) Command::single(Action::Future(Box::pin(future.map(f))))
} }
/// Creates a [`Command`] that runs the given stream to completion.
pub fn run<A>(
stream: impl Stream<Item = A> + 'static + MaybeSend,
f: impl Fn(A) -> T + 'static + MaybeSend,
) -> Command<T> {
use futures::StreamExt;
Command::single(Action::Stream(Box::pin(stream.map(f))))
}
/// Creates a [`Command`] that performs the actions of all the given /// Creates a [`Command`] that performs the actions of all the given
/// commands. /// commands.
/// ///

View file

@ -18,6 +18,11 @@ pub enum Action<T> {
/// [`Future`]: iced_futures::BoxFuture /// [`Future`]: iced_futures::BoxFuture
Future(iced_futures::BoxFuture<T>), Future(iced_futures::BoxFuture<T>),
/// Run a [`Stream`] to completion.
///
/// [`Stream`]: iced_futures::BoxStream
Stream(iced_futures::BoxStream<T>),
/// Run a clipboard action. /// Run a clipboard action.
Clipboard(clipboard::Action<T>), Clipboard(clipboard::Action<T>),
@ -52,10 +57,11 @@ impl<T> Action<T> {
A: 'static, A: 'static,
T: 'static, T: 'static,
{ {
use iced_futures::futures::FutureExt; use iced_futures::futures::{FutureExt, StreamExt};
match self { match self {
Self::Future(future) => Action::Future(Box::pin(future.map(f))), Self::Future(future) => Action::Future(Box::pin(future.map(f))),
Self::Stream(stream) => Action::Stream(Box::pin(stream.map(f))),
Self::Clipboard(action) => Action::Clipboard(action.map(f)), Self::Clipboard(action) => Action::Clipboard(action.map(f)),
Self::Window(window) => Action::Window(window.map(f)), Self::Window(window) => Action::Window(window.map(f)),
Self::System(system) => Action::System(system.map(f)), Self::System(system) => Action::System(system.map(f)),
@ -74,6 +80,7 @@ impl<T> fmt::Debug for Action<T> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
Self::Future(_) => write!(f, "Action::Future"), Self::Future(_) => write!(f, "Action::Future"),
Self::Stream(_) => write!(f, "Action::Stream"),
Self::Clipboard(action) => { Self::Clipboard(action) => {
write!(f, "Action::Clipboard({action:?})") write!(f, "Action::Clipboard({action:?})")
} }

View file

@ -736,6 +736,9 @@ pub fn run_command<A, C, E>(
command::Action::Future(future) => { command::Action::Future(future) => {
runtime.spawn(future); runtime.spawn(future);
} }
command::Action::Stream(stream) => {
runtime.run(stream);
}
command::Action::Clipboard(action) => match action { command::Action::Clipboard(action) => match action {
clipboard::Action::Read(tag) => { clipboard::Action::Read(tag) => {
let message = tag(clipboard.read()); let message = tag(clipboard.read());