Draft websocket example 🎉
This commit is contained in:
parent
2f557731f3
commit
dc50a2830a
8 changed files with 426 additions and 73 deletions
|
|
@ -2,7 +2,6 @@
|
|||
use crate::event::{self, Event};
|
||||
use crate::Hasher;
|
||||
|
||||
use iced_futures::futures::channel::mpsc;
|
||||
use iced_futures::futures::{self, Future, Stream};
|
||||
use iced_futures::BoxStream;
|
||||
|
||||
|
|
@ -59,21 +58,9 @@ pub fn events_with<Message>(
|
|||
where
|
||||
Message: 'static + Send,
|
||||
{
|
||||
#[derive(Debug, Clone, Copy, Hash)]
|
||||
struct Events(u64);
|
||||
|
||||
let hash = {
|
||||
use std::hash::Hasher as _;
|
||||
|
||||
let mut hasher = Hasher::default();
|
||||
|
||||
f.hash(&mut hasher);
|
||||
|
||||
hasher.finish()
|
||||
};
|
||||
|
||||
Subscription::from_recipe(Runner {
|
||||
initial: Events(hash),
|
||||
id: f,
|
||||
initial: (),
|
||||
spawn: move |_, events| {
|
||||
use futures::future;
|
||||
use futures::stream::StreamExt;
|
||||
|
|
@ -89,16 +76,19 @@ where
|
|||
/// [`Stream`] returned by the provided closure.
|
||||
///
|
||||
/// The `initial` state will be used to uniquely identify the [`Subscription`].
|
||||
pub fn run<T, S, Message>(
|
||||
pub fn run<I, T, S, Message>(
|
||||
id: I,
|
||||
initial: T,
|
||||
f: impl FnOnce(T) -> S + 'static,
|
||||
) -> Subscription<Message>
|
||||
where
|
||||
Message: 'static,
|
||||
T: Clone + Hash + 'static,
|
||||
I: Hash + 'static,
|
||||
T: 'static,
|
||||
S: Stream<Item = Message> + Send + 'static,
|
||||
Message: 'static,
|
||||
{
|
||||
Subscription::from_recipe(Runner {
|
||||
id,
|
||||
initial,
|
||||
spawn: move |initial, _| f(initial),
|
||||
})
|
||||
|
|
@ -108,79 +98,41 @@ where
|
|||
/// [`Stream`] that will call the provided closure to produce every `Message`.
|
||||
///
|
||||
/// The `initial` state will be used to uniquely identify the [`Subscription`].
|
||||
pub fn unfold<T, Fut, Message>(
|
||||
pub fn unfold<I, T, Fut, Message>(
|
||||
id: I,
|
||||
initial: T,
|
||||
mut f: impl FnMut(T) -> Fut + Send + Sync + 'static,
|
||||
) -> Subscription<Message>
|
||||
where
|
||||
Message: 'static,
|
||||
T: Clone + Hash + Send + 'static,
|
||||
Fut: Future<Output = (Message, T)> + Send + 'static,
|
||||
I: Hash + 'static,
|
||||
T: Send + 'static,
|
||||
Fut: Future<Output = (Option<Message>, T)> + Send + 'static,
|
||||
Message: 'static + Send,
|
||||
{
|
||||
use futures::future::FutureExt;
|
||||
|
||||
run(initial, move |initial| {
|
||||
futures::stream::unfold(initial, move |state| f(state).map(Some))
|
||||
})
|
||||
}
|
||||
|
||||
/// Returns a [`Subscription`] that will open a channel and asynchronously run a
|
||||
/// [`Stream`] that will call the provided closure to produce every `Message`.
|
||||
///
|
||||
/// When the [`Subscription`] starts, an `on_ready` message will be produced
|
||||
/// containing the [`mpsc::Sender`] end of the channel, which can be used by
|
||||
/// the parent application to send `Input` to the running [`Subscription`].
|
||||
///
|
||||
/// The provided closure should use the [`mpsc::Receiver`] argument to await for
|
||||
/// any `Input`.
|
||||
///
|
||||
/// This function is really useful to create asynchronous workers with
|
||||
/// bidirectional communication with a parent application.
|
||||
///
|
||||
/// The `initial` state will be used to uniquely identify the [`Subscription`].
|
||||
pub fn worker<T, Fut, Message, Input>(
|
||||
initial: T,
|
||||
on_ready: impl FnOnce(mpsc::Sender<Input>) -> Message + 'static,
|
||||
f: impl FnMut(T, &mut mpsc::Receiver<Input>) -> Fut + Send + Sync + 'static,
|
||||
) -> Subscription<Message>
|
||||
where
|
||||
T: Clone + Hash + Send + 'static,
|
||||
Fut: Future<Output = (Message, T)> + Send + 'static,
|
||||
Message: Send + 'static,
|
||||
Input: Send + 'static,
|
||||
{
|
||||
use futures::future;
|
||||
use futures::future::{self, FutureExt};
|
||||
use futures::stream::StreamExt;
|
||||
|
||||
run(initial, move |initial| {
|
||||
let (sender, receiver) = mpsc::channel(100);
|
||||
|
||||
futures::stream::once(future::ready(on_ready(sender))).chain(
|
||||
futures::stream::unfold(
|
||||
(f, initial, receiver),
|
||||
move |(mut f, state, mut receiver)| async {
|
||||
let (message, state) = f(state, &mut receiver).await;
|
||||
|
||||
Some((message, (f, state, receiver)))
|
||||
},
|
||||
),
|
||||
)
|
||||
run(id, initial, move |initial| {
|
||||
futures::stream::unfold(initial, move |state| f(state).map(Some))
|
||||
.filter_map(future::ready)
|
||||
})
|
||||
}
|
||||
|
||||
struct Runner<T, F, S, Message>
|
||||
struct Runner<I, T, F, S, Message>
|
||||
where
|
||||
F: FnOnce(T, EventStream) -> S,
|
||||
S: Stream<Item = Message>,
|
||||
{
|
||||
id: I,
|
||||
initial: T,
|
||||
spawn: F,
|
||||
}
|
||||
|
||||
impl<T, S, F, Message> Recipe<Hasher, (Event, event::Status)>
|
||||
for Runner<T, F, S, Message>
|
||||
impl<I, T, S, F, Message> Recipe<Hasher, (Event, event::Status)>
|
||||
for Runner<I, T, F, S, Message>
|
||||
where
|
||||
T: Clone + Hash + 'static,
|
||||
I: Hash + 'static,
|
||||
T: 'static,
|
||||
F: FnOnce(T, EventStream) -> S,
|
||||
S: Stream<Item = Message> + Send + 'static,
|
||||
{
|
||||
|
|
@ -189,7 +141,7 @@ where
|
|||
fn hash(&self, state: &mut Hasher) {
|
||||
std::any::TypeId::of::<T>().hash(state);
|
||||
|
||||
self.initial.hash(state);
|
||||
self.id.hash(state);
|
||||
}
|
||||
|
||||
fn stream(self: Box<Self>, input: EventStream) -> BoxStream<Self::Output> {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue