Implement subscription::Tracker in iced_core
This commit is contained in:
parent
6ca5e6184f
commit
32f7ca261f
7 changed files with 131 additions and 8 deletions
|
|
@ -11,7 +11,8 @@ repository = "https://github.com/hecrj/iced"
|
||||||
# Exposes a future-based `Command` type
|
# Exposes a future-based `Command` type
|
||||||
command = ["futures"]
|
command = ["futures"]
|
||||||
# Exposes a future-based `Subscription` type
|
# Exposes a future-based `Subscription` type
|
||||||
subscription = ["futures"]
|
subscription = ["futures", "log"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
futures = { version = "0.3", optional = true }
|
futures = { version = "0.3", optional = true }
|
||||||
|
log = { version = "0.4", optional = true }
|
||||||
|
|
|
||||||
|
|
@ -9,7 +9,7 @@
|
||||||
//! [Iced]: https://github.com/hecrj/iced
|
//! [Iced]: https://github.com/hecrj/iced
|
||||||
//! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native
|
//! [`iced_native`]: https://github.com/hecrj/iced/tree/master/native
|
||||||
//! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web
|
//! [`iced_web`]: https://github.com/hecrj/iced/tree/master/web
|
||||||
#![deny(missing_docs)]
|
//#![deny(missing_docs)]
|
||||||
#![deny(missing_debug_implementations)]
|
#![deny(missing_debug_implementations)]
|
||||||
#![deny(unused_results)]
|
#![deny(unused_results)]
|
||||||
#![deny(unsafe_code)]
|
#![deny(unsafe_code)]
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,9 @@
|
||||||
//! Listen to external events in your application.
|
//! Listen to external events in your application.
|
||||||
|
mod tracker;
|
||||||
|
|
||||||
|
pub use tracker::Tracker;
|
||||||
|
|
||||||
|
use futures::stream::BoxStream;
|
||||||
|
|
||||||
/// A request to listen to external events.
|
/// A request to listen to external events.
|
||||||
///
|
///
|
||||||
|
|
@ -134,8 +139,8 @@ pub trait Recipe<Hasher: std::hash::Hasher, Input> {
|
||||||
/// [`Recipe`]: trait.Recipe.html
|
/// [`Recipe`]: trait.Recipe.html
|
||||||
fn stream(
|
fn stream(
|
||||||
self: Box<Self>,
|
self: Box<Self>,
|
||||||
input: Input,
|
input: BoxStream<'static, Input>,
|
||||||
) -> futures::stream::BoxStream<'static, Self::Output>;
|
) -> BoxStream<'static, Self::Output>;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct Map<Hasher, Input, A, B> {
|
struct Map<Hasher, Input, A, B> {
|
||||||
|
|
@ -169,7 +174,7 @@ where
|
||||||
|
|
||||||
fn stream(
|
fn stream(
|
||||||
self: Box<Self>,
|
self: Box<Self>,
|
||||||
input: I,
|
input: BoxStream<'static, I>,
|
||||||
) -> futures::stream::BoxStream<'static, Self::Output> {
|
) -> futures::stream::BoxStream<'static, Self::Output> {
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
|
||||||
|
|
|
||||||
112
core/src/subscription/tracker.rs
Normal file
112
core/src/subscription/tracker.rs
Normal file
|
|
@ -0,0 +1,112 @@
|
||||||
|
use crate::Subscription;
|
||||||
|
|
||||||
|
use futures::{future::BoxFuture, sink::Sink};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Tracker<Hasher, Event> {
|
||||||
|
subscriptions: HashMap<u64, Execution<Event>>,
|
||||||
|
_hasher: PhantomData<Hasher>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct Execution<Event> {
|
||||||
|
_cancel: futures::channel::oneshot::Sender<()>,
|
||||||
|
listener: Option<futures::channel::mpsc::Sender<Event>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<Hasher, Event> Tracker<Hasher, Event>
|
||||||
|
where
|
||||||
|
Hasher: std::hash::Hasher + Default,
|
||||||
|
Event: 'static + Send + Clone,
|
||||||
|
{
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
subscriptions: HashMap::new(),
|
||||||
|
_hasher: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update<Message, S>(
|
||||||
|
&mut self,
|
||||||
|
subscription: Subscription<Hasher, Event, Message>,
|
||||||
|
sink: S,
|
||||||
|
) -> Vec<BoxFuture<'static, ()>>
|
||||||
|
where
|
||||||
|
Message: 'static + Send,
|
||||||
|
S: 'static
|
||||||
|
+ Sink<Message, Error = core::convert::Infallible>
|
||||||
|
+ Unpin
|
||||||
|
+ Send
|
||||||
|
+ Clone,
|
||||||
|
{
|
||||||
|
use futures::{future::FutureExt, stream::StreamExt};
|
||||||
|
|
||||||
|
let mut futures = Vec::new();
|
||||||
|
|
||||||
|
let recipes = subscription.recipes();
|
||||||
|
let mut alive = std::collections::HashSet::new();
|
||||||
|
|
||||||
|
for recipe in recipes {
|
||||||
|
let id = {
|
||||||
|
let mut hasher = Hasher::default();
|
||||||
|
recipe.hash(&mut hasher);
|
||||||
|
|
||||||
|
hasher.finish()
|
||||||
|
};
|
||||||
|
|
||||||
|
let _ = alive.insert(id);
|
||||||
|
|
||||||
|
if self.subscriptions.contains_key(&id) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
let (cancel, cancelled) = futures::channel::oneshot::channel();
|
||||||
|
|
||||||
|
// TODO: Use bus if/when it supports async
|
||||||
|
let (event_sender, event_receiver) =
|
||||||
|
futures::channel::mpsc::channel(100);
|
||||||
|
|
||||||
|
let stream = recipe.stream(event_receiver.boxed());
|
||||||
|
|
||||||
|
let future = futures::future::select(
|
||||||
|
cancelled,
|
||||||
|
stream.map(Ok).forward(sink.clone()),
|
||||||
|
)
|
||||||
|
.map(|_| ());
|
||||||
|
|
||||||
|
let _ = self.subscriptions.insert(
|
||||||
|
id,
|
||||||
|
Execution {
|
||||||
|
_cancel: cancel,
|
||||||
|
listener: if event_sender.is_closed() {
|
||||||
|
None
|
||||||
|
} else {
|
||||||
|
Some(event_sender)
|
||||||
|
},
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
futures.push(future.boxed());
|
||||||
|
}
|
||||||
|
|
||||||
|
self.subscriptions.retain(|id, _| alive.contains(&id));
|
||||||
|
|
||||||
|
futures
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn broadcast(&mut self, event: Event) {
|
||||||
|
self.subscriptions
|
||||||
|
.values_mut()
|
||||||
|
.filter_map(|connection| connection.listener.as_mut())
|
||||||
|
.for_each(|listener| {
|
||||||
|
if let Err(error) = listener.try_send(event.clone()) {
|
||||||
|
log::error!(
|
||||||
|
"Error sending event to subscription: {:?}",
|
||||||
|
error
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -165,7 +165,7 @@ mod time {
|
||||||
|
|
||||||
fn stream(
|
fn stream(
|
||||||
self: Box<Self>,
|
self: Box<Self>,
|
||||||
_input: I,
|
_input: futures::stream::BoxStream<'static, I>,
|
||||||
) -> futures::stream::BoxStream<'static, Self::Output> {
|
) -> futures::stream::BoxStream<'static, Self::Output> {
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ use futures::stream::BoxStream;
|
||||||
///
|
///
|
||||||
/// [`Command`]: ../struct.Command.html
|
/// [`Command`]: ../struct.Command.html
|
||||||
/// [`Subscription`]: struct.Subscription.html
|
/// [`Subscription`]: struct.Subscription.html
|
||||||
pub type Subscription<T> = iced_core::Subscription<Hasher, EventStream, T>;
|
pub type Subscription<T> = iced_core::Subscription<Hasher, Event, T>;
|
||||||
|
|
||||||
/// A stream of runtime events.
|
/// A stream of runtime events.
|
||||||
///
|
///
|
||||||
|
|
@ -24,6 +24,11 @@ pub type Subscription<T> = iced_core::Subscription<Hasher, EventStream, T>;
|
||||||
/// [`Subscription`]: type.Subscription.html
|
/// [`Subscription`]: type.Subscription.html
|
||||||
pub type EventStream = BoxStream<'static, Event>;
|
pub type EventStream = BoxStream<'static, Event>;
|
||||||
|
|
||||||
|
/// A native [`Subscription`] tracker.
|
||||||
|
///
|
||||||
|
/// [`Subscription`]: type.Subscription.html
|
||||||
|
pub type Tracker = iced_core::subscription::Tracker<Hasher, Event>;
|
||||||
|
|
||||||
pub use iced_core::subscription::Recipe;
|
pub use iced_core::subscription::Recipe;
|
||||||
|
|
||||||
mod events;
|
mod events;
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use crate::{
|
||||||
|
|
||||||
pub struct Events;
|
pub struct Events;
|
||||||
|
|
||||||
impl Recipe<Hasher, EventStream> for Events {
|
impl Recipe<Hasher, Event> for Events {
|
||||||
type Output = Event;
|
type Output = Event;
|
||||||
|
|
||||||
fn hash(&self, state: &mut Hasher) {
|
fn hash(&self, state: &mut Hasher) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue