Use oneshot and future::select to cancel streams

This commit is contained in:
Héctor Ramón Jiménez 2019-12-07 08:51:44 +01:00
parent e55dfa7551
commit 48145ba51e
3 changed files with 48 additions and 88 deletions

View file

@ -2,60 +2,51 @@
/// An event subscription. /// An event subscription.
pub struct Subscription<T> { pub struct Subscription<T> {
definitions: Vec<Box<dyn Definition<Message = T>>>, handles: Vec<Box<dyn Handle<Output = T>>>,
} }
impl<T> Subscription<T> { impl<T> Subscription<T> {
pub fn none() -> Self { pub fn none() -> Self {
Self { Self {
definitions: Vec::new(), handles: Vec::new(),
} }
} }
pub fn batch(subscriptions: impl Iterator<Item = Subscription<T>>) -> Self { pub fn batch(subscriptions: impl Iterator<Item = Subscription<T>>) -> Self {
Self { Self {
definitions: subscriptions handles: subscriptions
.flat_map(|subscription| subscription.definitions) .flat_map(|subscription| subscription.handles)
.collect(), .collect(),
} }
} }
pub fn definitions(self) -> Vec<Box<dyn Definition<Message = T>>> { pub fn handles(self) -> Vec<Box<dyn Handle<Output = T>>> {
self.definitions self.handles
} }
} }
impl<T, A> From<A> for Subscription<T> impl<T, A> From<A> for Subscription<T>
where where
A: Definition<Message = T> + 'static, A: Handle<Output = T> + 'static,
{ {
fn from(definition: A) -> Self { fn from(handle: A) -> Self {
Self { Self {
definitions: vec![Box::new(definition)], handles: vec![Box::new(handle)],
} }
} }
} }
/// The definition of an event subscription. /// The handle of an event subscription.
pub trait Definition { pub trait Handle {
type Message; type Output;
fn id(&self) -> u64; fn id(&self) -> u64;
fn stream( fn stream(&self) -> futures::stream::BoxStream<'static, Self::Output>;
&self,
) -> (
futures::stream::BoxStream<'static, Self::Message>,
Box<dyn Handle>,
);
}
pub trait Handle {
fn cancel(&mut self);
} }
impl<T> std::fmt::Debug for Subscription<T> { impl<T> std::fmt::Debug for Subscription<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Command").finish() f.debug_struct("Subscription").finish()
} }
} }

View file

@ -82,7 +82,7 @@ impl Application for Clock {
} }
mod time { mod time {
use std::sync::{Arc, Mutex}; use std::sync::Arc;
pub fn every<Message>( pub fn every<Message>(
duration: std::time::Duration, duration: std::time::Duration,
@ -108,59 +108,30 @@ mod time {
>, >,
} }
struct TickState { impl<Message> iced::subscription::Handle for Tick<Message>
alive: Arc<Mutex<bool>>,
}
impl iced::subscription::Handle for TickState {
fn cancel(&mut self) {
match self.alive.lock() {
Ok(mut guard) => *guard = false,
_ => {}
}
}
}
impl<Message> iced::subscription::Definition for Tick<Message>
where where
Message: 'static, Message: 'static,
{ {
type Message = Message; type Output = Message;
fn id(&self) -> u64 { fn id(&self) -> u64 {
0 0
} }
fn stream( fn stream(&self) -> futures::stream::BoxStream<'static, Message> {
&self,
) -> (
futures::stream::BoxStream<'static, Message>,
Box<dyn iced::subscription::Handle>,
) {
use futures::StreamExt; use futures::StreamExt;
let duration = self.duration.clone(); let duration = self.duration.clone();
let function = self.message.clone(); let function = self.message.clone();
let alive = Arc::new(Mutex::new(true));
let state = TickState { let stream =
alive: alive.clone(), futures::stream::iter(std::iter::repeat(())).map(move |_| {
}; std::thread::sleep(duration);
let stream = futures::stream::poll_fn(move |_| { function(chrono::Local::now())
std::thread::sleep(duration); });
if !*alive.lock().unwrap() { stream.boxed()
return std::task::Poll::Ready(None);
}
let now = chrono::Local::now();
std::task::Poll::Ready(Some(now))
})
.map(move |time| function(time));
(stream.boxed(), Box::new(state))
} }
} }
} }

View file

@ -2,8 +2,8 @@ use crate::{
conversion, conversion,
input::{keyboard, mouse}, input::{keyboard, mouse},
renderer::{Target, Windowed}, renderer::{Target, Windowed},
subscription, Cache, Command, Container, Debug, Element, Event, Length, Cache, Command, Container, Debug, Element, Event, Length, MouseCursor,
MouseCursor, Settings, Subscription, UserInterface, Settings, Subscription, UserInterface,
}; };
use std::collections::HashMap; use std::collections::HashMap;
@ -420,7 +420,7 @@ fn spawn<Message: Send>(
} }
pub struct Subscriptions { pub struct Subscriptions {
alive: HashMap<u64, Box<dyn subscription::Handle>>, alive: HashMap<u64, futures::channel::oneshot::Sender<()>>,
} }
impl Subscriptions { impl Subscriptions {
@ -436,46 +436,44 @@ impl Subscriptions {
thread_pool: &mut futures::executor::ThreadPool, thread_pool: &mut futures::executor::ThreadPool,
proxy: &winit::event_loop::EventLoopProxy<Message>, proxy: &winit::event_loop::EventLoopProxy<Message>,
) { ) {
use futures::stream::StreamExt; use futures::{future::FutureExt, stream::StreamExt};
let definitions = subscriptions.definitions(); let handles = subscriptions.handles();
let mut alive = std::collections::HashSet::new(); let mut alive = std::collections::HashSet::new();
for definition in definitions { for handle in handles {
let id = definition.id(); let id = handle.id();
let _ = alive.insert(id); let _ = alive.insert(id);
if !self.alive.contains_key(&id) { if !self.alive.contains_key(&id) {
let (stream, handle) = definition.stream(); let (cancel, cancelled) = futures::channel::oneshot::channel();
let stream = handle.stream();
let proxy = let proxy =
std::sync::Arc::new(std::sync::Mutex::new(proxy.clone())); std::sync::Arc::new(std::sync::Mutex::new(proxy.clone()));
let future = stream.for_each(move |message| { let future = futures::future::select(
proxy cancelled,
.lock() stream.for_each(move |message| {
.expect("Acquire event loop proxy lock") proxy
.send_event(message) .lock()
.expect("Send subscription result to event loop"); .expect("Acquire event loop proxy lock")
.send_event(message)
.expect("Send subscription result to event loop");
futures::future::ready(()) futures::future::ready(())
}); }),
)
.map(|_| ());
thread_pool.spawn_ok(future); thread_pool.spawn_ok(future);
let _ = self.alive.insert(id, handle); let _ = self.alive.insert(id, cancel);
} }
} }
self.alive.retain(|id, handle| { self.alive.retain(|id, _| alive.contains(&id));
let is_still_alive = alive.contains(&id);
if !is_still_alive {
handle.cancel();
}
is_still_alive
});
} }
} }