Create iced_futures and wire everything up

This commit is contained in:
Héctor Ramón Jiménez 2020-01-19 10:17:08 +01:00
parent d50ff9b5d9
commit b5b17ed4d8
23 changed files with 196 additions and 195 deletions

23
futures/Cargo.toml Normal file
View file

@ -0,0 +1,23 @@
[package]
name = "iced_futures"
version = "0.1.0-alpha"
authors = ["Héctor Ramón Jiménez <hector0193@gmail.com>"]
edition = "2018"
description = "Commands, subscriptions, and runtimes for Iced"
license = "MIT"
repository = "https://github.com/hecrj/iced"
documentation = "https://docs.rs/iced_futures"
keywords = ["gui", "ui", "graphics", "interface", "futures"]
categories = ["gui"]
[dependencies]
log = "0.4"
[dependencies.futures]
version = "0.3"
features = ["thread-pool"]
[dependencies.tokio]
version = "0.2"
optional = true
features = ["rt-core"]

100
futures/src/command.rs Normal file
View file

@ -0,0 +1,100 @@
use futures::future::{BoxFuture, Future, FutureExt};
/// A collection of async operations.
///
/// You should be able to turn a future easily into a [`Command`], either by
/// using the `From` trait or [`Command::perform`].
///
/// [`Command`]: struct.Command.html
pub struct Command<T> {
futures: Vec<BoxFuture<'static, T>>,
}
impl<T> Command<T> {
/// Creates an empty [`Command`].
///
/// In other words, a [`Command`] that does nothing.
///
/// [`Command`]: struct.Command.html
pub fn none() -> Self {
Self {
futures: Vec::new(),
}
}
/// Creates a [`Command`] that performs the action of the given future.
///
/// [`Command`]: struct.Command.html
pub fn perform<A>(
future: impl Future<Output = T> + 'static + Send,
f: impl Fn(T) -> A + 'static + Send,
) -> Command<A> {
Command {
futures: vec![future.map(f).boxed()],
}
}
/// Applies a transformation to the result of a [`Command`].
///
/// [`Command`]: struct.Command.html
pub fn map<A>(
mut self,
f: impl Fn(T) -> A + 'static + Send + Sync,
) -> Command<A>
where
T: 'static,
{
let f = std::sync::Arc::new(f);
Command {
futures: self
.futures
.drain(..)
.map(|future| {
let f = f.clone();
future.map(move |result| f(result)).boxed()
})
.collect(),
}
}
/// Creates a [`Command`] that performs the actions of all the given
/// commands.
///
/// Once this command is run, all the commands will be exectued at once.
///
/// [`Command`]: struct.Command.html
pub fn batch(commands: impl IntoIterator<Item = Command<T>>) -> Self {
Self {
futures: commands
.into_iter()
.flat_map(|command| command.futures)
.collect(),
}
}
/// Converts a [`Command`] into its underlying list of futures.
///
/// [`Command`]: struct.Command.html
pub fn futures(self) -> Vec<BoxFuture<'static, T>> {
self.futures
}
}
impl<T, A> From<A> for Command<T>
where
A: Future<Output = T> + 'static + Send,
{
fn from(future: A) -> Self {
Self {
futures: vec![future.boxed()],
}
}
}
impl<T> std::fmt::Debug for Command<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Command").finish()
}
}

8
futures/src/lib.rs Normal file
View file

@ -0,0 +1,8 @@
mod command;
pub mod runtime;
pub mod subscription;
pub use command::Command;
pub use runtime::Runtime;
pub use subscription::Subscription;

79
futures/src/runtime.rs Normal file
View file

@ -0,0 +1,79 @@
//! Run commands and subscriptions.
mod executor;
pub use executor::Executor;
use crate::{subscription, Command, Subscription};
use futures::Sink;
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Runtime<Hasher, Event, Executor, Receiver, Message> {
executor: Executor,
receiver: Receiver,
subscriptions: subscription::Tracker<Hasher, Event>,
_message: PhantomData<Message>,
}
impl<Hasher, Event, Executor, Receiver, Message>
Runtime<Hasher, Event, Executor, Receiver, Message>
where
Hasher: std::hash::Hasher + Default,
Event: Send + Clone + 'static,
Executor: self::Executor,
Receiver: Sink<Message, Error = core::convert::Infallible>
+ Unpin
+ Send
+ Clone
+ 'static,
Message: Send + 'static,
{
pub fn new(executor: Executor, receiver: Receiver) -> Self {
Self {
executor,
receiver,
subscriptions: subscription::Tracker::new(),
_message: PhantomData,
}
}
pub fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
self.executor.enter(f)
}
pub fn spawn(&mut self, command: Command<Message>) {
use futures::{FutureExt, SinkExt};
let futures = command.futures();
for future in futures {
let mut receiver = self.receiver.clone();
self.executor.spawn(future.then(|message| {
async move {
let _ = receiver.send(message).await;
()
}
}));
}
}
pub fn track(
&mut self,
subscription: Subscription<Hasher, Event, Message>,
) {
let futures = self
.subscriptions
.update(subscription, self.receiver.clone());
for future in futures {
self.executor.spawn(future);
}
}
pub fn broadcast(&mut self, event: Event) {
self.subscriptions.broadcast(event);
}
}

View file

@ -0,0 +1,26 @@
use futures::Future;
pub trait Executor {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static);
fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
f()
}
}
impl Executor for futures::executor::ThreadPool {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
self.spawn_ok(future);
}
}
#[cfg(feature = "tokio")]
impl Executor for tokio::runtime::Runtime {
fn spawn(&self, future: impl Future<Output = ()> + Send + 'static) {
let _ = tokio::runtime::Runtime::spawn(self, future);
}
fn enter<R>(&self, f: impl FnOnce() -> R) -> R {
tokio::runtime::Runtime::enter(self, f)
}
}

188
futures/src/subscription.rs Normal file
View file

@ -0,0 +1,188 @@
//! Listen to external events in your application.
mod tracker;
pub use tracker::Tracker;
use futures::stream::BoxStream;
/// A request to listen to external events.
///
/// Besides performing async actions on demand with [`Command`], most
/// applications also need to listen to external events passively.
///
/// A [`Subscription`] is normally provided to some runtime, like a [`Command`],
/// and it will generate events as long as the user keeps requesting it.
///
/// For instance, you can use a [`Subscription`] to listen to a WebSocket
/// connection, keyboard presses, mouse events, time ticks, etc.
///
/// This type is normally aliased by runtimes with a specific `Input` and/or
/// `Hasher`.
///
/// [`Command`]: ../struct.Command.html
/// [`Subscription`]: struct.Subscription.html
pub struct Subscription<Hasher, Input, Output> {
recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>,
}
impl<H, I, O> Subscription<H, I, O>
where
H: std::hash::Hasher,
{
/// Returns an empty [`Subscription`] that will not produce any output.
///
/// [`Subscription`]: struct.Subscription.html
pub fn none() -> Self {
Self {
recipes: Vec::new(),
}
}
/// Creates a [`Subscription`] from a [`Recipe`] describing it.
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
pub fn from_recipe(
recipe: impl Recipe<H, I, Output = O> + 'static,
) -> Self {
Self {
recipes: vec![Box::new(recipe)],
}
}
/// Batches all the provided subscriptions and returns the resulting
/// [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
pub fn batch(
subscriptions: impl IntoIterator<Item = Subscription<H, I, O>>,
) -> Self {
Self {
recipes: subscriptions
.into_iter()
.flat_map(|subscription| subscription.recipes)
.collect(),
}
}
/// Returns the different recipes of the [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> {
self.recipes
}
/// Transforms the [`Subscription`] output with the given function.
///
/// [`Subscription`]: struct.Subscription.html
pub fn map<A>(
mut self,
f: impl Fn(O) -> A + Send + Sync + 'static,
) -> Subscription<H, I, A>
where
H: 'static,
I: 'static,
O: 'static,
A: 'static,
{
let function = std::sync::Arc::new(f);
Subscription {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
Box::new(Map::new(recipe, function.clone()))
as Box<dyn Recipe<H, I, Output = A>>
})
.collect(),
}
}
}
impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription").finish()
}
}
/// The description of a [`Subscription`].
///
/// A [`Recipe`] is the internal definition of a [`Subscription`]. It is used
/// by runtimes to run and identify subscriptions. You can use it to create your
/// own!
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
pub trait Recipe<Hasher: std::hash::Hasher, Input> {
/// The events that will be produced by a [`Subscription`] with this
/// [`Recipe`].
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
type Output;
/// Hashes the [`Recipe`].
///
/// This is used by runtimes to uniquely identify a [`Subscription`].
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
fn hash(&self, state: &mut Hasher);
/// Executes the [`Recipe`] and produces the stream of events of its
/// [`Subscription`].
///
/// It receives some generic `Input`, which is normally defined by runtimes.
///
/// [`Subscription`]: struct.Subscription.html
/// [`Recipe`]: trait.Recipe.html
fn stream(
self: Box<Self>,
input: BoxStream<'static, Input>,
) -> BoxStream<'static, Self::Output>;
}
struct Map<Hasher, Input, A, B> {
recipe: Box<dyn Recipe<Hasher, Input, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
}
impl<H, I, A, B> Map<H, I, A, B> {
fn new(
recipe: Box<dyn Recipe<H, I, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
) -> Self {
Map { recipe, mapper }
}
}
impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B>
where
A: 'static,
B: 'static,
H: std::hash::Hasher,
{
type Output = B;
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<B>().hash(state);
self.recipe.hash(state);
}
fn stream(
self: Box<Self>,
input: BoxStream<'static, I>,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::StreamExt;
let mapper = self.mapper;
self.recipe
.stream(input)
.map(move |element| mapper(element))
.boxed()
}
}

View file

@ -0,0 +1,112 @@
use crate::Subscription;
use futures::{future::BoxFuture, sink::Sink};
use std::collections::HashMap;
use std::marker::PhantomData;
#[derive(Debug)]
pub struct Tracker<Hasher, Event> {
subscriptions: HashMap<u64, Execution<Event>>,
_hasher: PhantomData<Hasher>,
}
#[derive(Debug)]
pub struct Execution<Event> {
_cancel: futures::channel::oneshot::Sender<()>,
listener: Option<futures::channel::mpsc::Sender<Event>>,
}
impl<Hasher, Event> Tracker<Hasher, Event>
where
Hasher: std::hash::Hasher + Default,
Event: 'static + Send + Clone,
{
pub fn new() -> Self {
Self {
subscriptions: HashMap::new(),
_hasher: PhantomData,
}
}
pub fn update<Message, Receiver>(
&mut self,
subscription: Subscription<Hasher, Event, Message>,
receiver: Receiver,
) -> Vec<BoxFuture<'static, ()>>
where
Message: 'static + Send,
Receiver: 'static
+ Sink<Message, Error = core::convert::Infallible>
+ Unpin
+ Send
+ Clone,
{
use futures::{future::FutureExt, stream::StreamExt};
let mut futures = Vec::new();
let recipes = subscription.recipes();
let mut alive = std::collections::HashSet::new();
for recipe in recipes {
let id = {
let mut hasher = Hasher::default();
recipe.hash(&mut hasher);
hasher.finish()
};
let _ = alive.insert(id);
if self.subscriptions.contains_key(&id) {
continue;
}
let (cancel, cancelled) = futures::channel::oneshot::channel();
// TODO: Use bus if/when it supports async
let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100);
let stream = recipe.stream(event_receiver.boxed());
let future = futures::future::select(
cancelled,
stream.map(Ok).forward(receiver.clone()),
)
.map(|_| ());
let _ = self.subscriptions.insert(
id,
Execution {
_cancel: cancel,
listener: if event_sender.is_closed() {
None
} else {
Some(event_sender)
},
},
);
futures.push(future.boxed());
}
self.subscriptions.retain(|id, _| alive.contains(&id));
futures
}
pub fn broadcast(&mut self, event: Event) {
self.subscriptions
.values_mut()
.filter_map(|connection| connection.listener.as_mut())
.for_each(|listener| {
if let Err(error) = listener.try_send(event.clone()) {
log::error!(
"Error sending event to subscription: {:?}",
error
);
}
});
}
}