Implement Subscription::map and from_recipe

This commit is contained in:
Héctor Ramón Jiménez 2019-12-10 03:43:00 +01:00
parent e189c22bb0
commit cdb7acf6c2
4 changed files with 130 additions and 65 deletions

View file

@ -1,60 +1,125 @@
//! Generate events asynchronously for you application. //! Generate events asynchronously for you application.
/// An event subscription. /// An event subscription.
pub struct Subscription<I, O> { pub struct Subscription<Hasher, Input, Output> {
connections: Vec<Box<dyn Connection<Input = I, Output = O>>>, recipes: Vec<Box<dyn Recipe<Hasher, Input, Output = Output>>>,
} }
impl<I, O> Subscription<I, O> { impl<H, I, O> Subscription<H, I, O>
where
H: std::hash::Hasher,
{
pub fn none() -> Self { pub fn none() -> Self {
Self { Self {
connections: Vec::new(), recipes: Vec::new(),
}
}
pub fn from_recipe(
recipe: impl Recipe<H, I, Output = O> + 'static,
) -> Self {
Self {
recipes: vec![Box::new(recipe)],
} }
} }
pub fn batch( pub fn batch(
subscriptions: impl Iterator<Item = Subscription<I, O>>, subscriptions: impl Iterator<Item = Subscription<H, I, O>>,
) -> Self { ) -> Self {
Self { Self {
connections: subscriptions recipes: subscriptions
.flat_map(|subscription| subscription.connections) .flat_map(|subscription| subscription.recipes)
.collect(), .collect(),
} }
} }
pub fn connections( pub fn recipes(self) -> Vec<Box<dyn Recipe<H, I, Output = O>>> {
self, self.recipes
) -> Vec<Box<dyn Connection<Input = I, Output = O>>> {
self.connections
} }
}
impl<I, O, T> From<T> for Subscription<I, O> pub fn map<A>(
where mut self,
T: Connection<Input = I, Output = O> + 'static, f: impl Fn(O) -> A + Send + Sync + 'static,
{ ) -> Subscription<H, I, A>
fn from(handle: T) -> Self { where
Self { H: 'static,
connections: vec![Box::new(handle)], I: 'static,
O: 'static,
A: 'static,
{
let function = std::sync::Arc::new(f);
Subscription {
recipes: self
.recipes
.drain(..)
.map(|recipe| {
Box::new(Map::new(recipe, function.clone()))
as Box<dyn Recipe<H, I, Output = A>>
})
.collect(),
} }
} }
} }
/// The connection of an event subscription. impl<I, O, H> std::fmt::Debug for Subscription<I, O, H> {
pub trait Connection {
type Input;
type Output;
fn id(&self) -> u64;
fn stream(
&self,
input: Self::Input,
) -> futures::stream::BoxStream<'static, Self::Output>;
}
impl<I, O> std::fmt::Debug for Subscription<I, O> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Subscription").finish() f.debug_struct("Subscription").finish()
} }
} }
/// The connection of an event subscription.
pub trait Recipe<Hasher: std::hash::Hasher, Input> {
type Output;
fn hash(&self, state: &mut Hasher);
fn stream(
&self,
input: Input,
) -> futures::stream::BoxStream<'static, Self::Output>;
}
struct Map<Hasher, Input, A, B> {
recipe: Box<dyn Recipe<Hasher, Input, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync>,
}
impl<H, I, A, B> Map<H, I, A, B> {
fn new(
recipe: Box<dyn Recipe<H, I, Output = A>>,
mapper: std::sync::Arc<dyn Fn(A) -> B + Send + Sync + 'static>,
) -> Self {
Map { recipe, mapper }
}
}
impl<H, I, A, B> Recipe<H, I> for Map<H, I, A, B>
where
A: 'static,
B: 'static,
H: std::hash::Hasher,
{
type Output = B;
fn hash(&self, state: &mut H) {
use std::hash::Hash;
std::any::TypeId::of::<B>().hash(state);
self.recipe.hash(state);
}
fn stream(
&self,
input: I,
) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::StreamExt;
let mapper = self.mapper.clone();
self.recipe
.stream(input)
.map(move |element| mapper(element))
.boxed()
}
}

View file

@ -51,7 +51,7 @@ impl Application for Events {
fn subscriptions(&self) -> Subscription<Message> { fn subscriptions(&self) -> Subscription<Message> {
if self.enabled { if self.enabled {
events::all(Message::EventOccurred) events::all().map(Message::EventOccurred)
} else { } else {
Subscription::none() Subscription::none()
} }
@ -89,41 +89,33 @@ impl Application for Events {
} }
mod events { mod events {
use std::sync::Arc; pub fn all() -> iced::Subscription<iced_native::Event> {
iced::Subscription::from_recipe(All)
pub fn all<Message>(
f: impl Fn(iced_native::Event) -> Message + 'static + Send + Sync,
) -> iced::Subscription<Message>
where
Message: Send + 'static,
{
All(Arc::new(f)).into()
} }
struct All<Message>( struct All;
Arc<dyn Fn(iced_native::Event) -> Message + Send + Sync>,
);
impl<Message> iced_native::subscription::Connection for All<Message> impl<H>
iced_native::subscription::Recipe<H, iced_native::subscription::Input>
for All
where where
Message: 'static, H: std::hash::Hasher,
{ {
type Input = iced_native::subscription::Input; type Output = iced_native::Event;
type Output = Message;
fn id(&self) -> u64 { fn hash(&self, state: &mut H) {
0 use std::hash::Hash;
std::any::TypeId::of::<All>().hash(state);
} }
fn stream( fn stream(
&self, &self,
input: iced_native::subscription::Input, input: iced_native::subscription::Input,
) -> futures::stream::BoxStream<'static, Message> { ) -> futures::stream::BoxStream<'static, Self::Output> {
use futures::StreamExt; use futures::StreamExt;
let function = self.0.clone(); input.boxed()
input.map(move |event| function(event)).boxed()
} }
} }
} }

View file

@ -1,6 +1,6 @@
use crate::Event; use crate::{Event, Hasher};
pub type Subscription<T> = iced_core::Subscription<Input, T>; pub type Subscription<T> = iced_core::Subscription<Hasher, Input, T>;
pub type Input = futures::channel::mpsc::Receiver<Event>; pub type Input = futures::channel::mpsc::Receiver<Event>;
pub use iced_core::subscription::Connection; pub use iced_core::subscription::Recipe;

View file

@ -2,8 +2,8 @@ use crate::{
conversion, conversion,
input::{keyboard, mouse}, input::{keyboard, mouse},
renderer::{Target, Windowed}, renderer::{Target, Windowed},
Cache, Command, Container, Debug, Element, Event, Length, MouseCursor, Cache, Command, Container, Debug, Element, Event, Hasher, Length,
Settings, Subscription, UserInterface, MouseCursor, Settings, Subscription, UserInterface,
}; };
use std::collections::HashMap; use std::collections::HashMap;
@ -448,11 +448,19 @@ impl Subscriptions {
) { ) {
use futures::{future::FutureExt, stream::StreamExt}; use futures::{future::FutureExt, stream::StreamExt};
let connections = subscriptions.connections(); let recipes = subscriptions.recipes();
let mut alive = std::collections::HashSet::new(); let mut alive = std::collections::HashSet::new();
for connection in connections { for recipe in recipes {
let id = connection.id(); let id = {
use std::hash::Hasher as _;
let mut hasher = Hasher::default();
recipe.hash(&mut hasher);
hasher.finish()
};
let _ = alive.insert(id); let _ = alive.insert(id);
if !self.alive.contains_key(&id) { if !self.alive.contains_key(&id) {
@ -460,7 +468,7 @@ impl Subscriptions {
let (event_sender, event_receiver) = let (event_sender, event_receiver) =
futures::channel::mpsc::channel(100); futures::channel::mpsc::channel(100);
let stream = connection.stream(event_receiver); let stream = recipe.stream(event_receiver);
let proxy = let proxy =
std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); std::sync::Arc::new(std::sync::Mutex::new(proxy.clone()));